]> pd.if.org Git - nbds/blob - txn/txn.c
5648d16abbba2b2d11b3f0b75bbbb267251746cb
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8 #include "skiplist.h"
9
10 #define UNDETERMINED_VERSION 0
11 #define ABORTED_VERSION      TAG_VALUE(0, TAG1)
12 #define INITIAL_WRITES_SIZE  4
13
14 typedef struct update_rec update_t;
15 typedef map_key_t version_t;
16
17 struct update_rec {
18     version_t version;
19     map_val_t value;
20     map_val_t next; // an earlier update
21 };
22
23 typedef struct write_rec {
24     map_key_t key;
25     update_t *rec; 
26 } write_rec_t;
27
28 struct txn {
29     version_t rv;
30     version_t wv;
31     map_t *map;
32     write_rec_t *writes;
33     size_t writes_size;
34     size_t writes_count;
35     size_t writes_scan;
36     txn_state_e state;
37 };
38
39 static txn_state_e txn_validate (txn_t *txn);
40
41 static version_t version_ = 1;
42
43 static skiplist_t *active_ = NULL;
44
45 void txn_init (void) {
46     active_ = sl_alloc(NULL);
47 }
48
49 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
50 // read version another transaction committed a change to an entry we are also trying to change.
51 //
52 // If we encounter a potential conflict with a transaction that is in the process of validating, we help it 
53 // complete validating. It must be finished before we can decide to rollback or commit.
54 //
55 static txn_state_e validate_key (txn_t *txn, map_key_t key) {
56     assert(txn->state != TXN_RUNNING);
57     
58     map_val_t val = map_get(txn->map, key);
59     update_t *update = NULL;
60     for (; val != DOES_NOT_EXIST; val = update->next) {
61
62         // If the update or its version is not tagged it means the update is committed.
63         //
64         // We can stop at the first committed record we find that is at least as old as our read version. All 
65         // the other committed records following it will be older. And all the uncommitted records following it 
66         // will eventually conflict with it and abort.
67         if (!IS_TAGGED(val, TAG2))
68             return TXN_VALIDATED;
69         update = (update_t *)STRIP_TAG(val, TAG2);
70         if (!IS_TAGGED(update->version, TAG1)) 
71             return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
72
73         // If the update's version is tagged then either the update was aborted or the the version number is 
74         // actually a pointer to a running transaction's txn_t.
75
76         // Skip aborted transactions.
77         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
78             continue;
79
80         // The update's transaction is still in progress. Access its txn_t.
81         txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
82         if (writer == txn)
83             continue; // Skip our own updates.
84         txn_state_e writer_state = writer->state;
85
86         // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its 
87         // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is
88         // greater than ours. See next comment below for why. 
89         if (writer_state == TXN_RUNNING)
90             continue; 
91         
92         // If <writer> has a later version than us we can safely ignore its updates. It will not commit until
93         // we have completed validation (in order to remain non-blocking it will help us validate if necessary). 
94         // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging 
95         // between validating two conflicting transactions.
96         if (writer_state == TXN_VALIDATING) {
97             if (writer->wv > txn->wv)
98                 continue;
99             // Help <writer> commit. We need to know if <writer> aborts or commits before we can decide what to
100             // do. But we don't want to block, so we assist.
101             writer_state = txn_validate(writer);
102         }
103
104         // Skip updates from aborted transactions.
105         if (writer_state == TXN_ABORTED)
106             continue;
107
108         assert(writer_state == TXN_VALIDATED);
109         return (writer->wv <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
110     }
111
112     return TXN_VALIDATED;
113 }
114
115 static txn_state_e txn_validate (txn_t *txn) {
116     assert(txn->state != TXN_RUNNING);
117     int i;
118     switch (txn->state) {
119
120         case TXN_VALIDATING:
121             if (txn->wv == UNDETERMINED_VERSION) {
122                 version_t wv = SYNC_ADD(&version_, 1);
123                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
124             }
125
126             for (i = 0; i < txn->writes_count; ++i) {
127                 txn_state_e s = validate_key(txn, txn->writes[i].key);
128                 if (s == TXN_ABORTED) {
129                     txn->state = TXN_ABORTED;
130                     break;
131                 }
132                 assert(s == TXN_VALIDATED);
133             }
134             if (txn->state == TXN_VALIDATING) {
135                 txn->state =  TXN_VALIDATED;
136             }
137             break;
138
139         case TXN_VALIDATED:
140         case TXN_ABORTED:
141             break;
142
143         default:
144             assert(FALSE);
145     }
146
147     return txn->state;
148 }
149
150 static update_t *alloc_update_rec (void) {
151     update_t *u = (update_t *)nbd_malloc(sizeof(update_t));
152     memset(u, 0, sizeof(update_t));
153     return u;
154 }
155
156 txn_t *txn_begin (map_t *map) {
157     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
158     memset(txn, 0, sizeof(txn_t));
159     txn->wv = UNDETERMINED_VERSION;
160     txn->state = TXN_RUNNING;
161     txn->map = map;
162     txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
163     txn->writes_size = INITIAL_WRITES_SIZE;
164
165     // acquire the read version for txn. must be careful to avoid a race
166     do {
167         txn->rv = version_;
168
169         unsigned old_count;
170         unsigned temp = 0;
171         do {
172             old_count = temp;
173             temp = sl_cas(active_, txn->rv, old_count, old_count + 1);
174         } while (temp != old_count);
175
176         if (txn->rv == version_)
177             break;
178
179         temp = 1;
180         do {
181             old_count = temp;
182             temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1);
183         } while (temp != old_count);
184     } while (1);
185
186     return txn;
187 }
188
189 void txn_abort (txn_t *txn) {
190     if (txn->state != TXN_RUNNING)
191         return; // TODO: return some sort of error code
192
193     int i;
194     for (i = 0; i < txn->writes_count; ++i) {
195         update_t *update = (update_t *)txn->writes[i].rec;
196         update->version = ABORTED_VERSION;
197     }
198
199     nbd_defer_free(txn->writes);
200     nbd_defer_free(txn);
201 }
202
203 txn_state_e txn_commit (txn_t *txn) {
204     if (txn->state != TXN_RUNNING)
205         return txn->state; // TODO: return some sort of error code
206
207     assert(txn->state == TXN_RUNNING);
208     txn->state = TXN_VALIDATING;
209     txn_state_e state = txn_validate(txn);
210
211     // Detach <txn> from its updates.
212     version_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv;
213     int i;
214     for (i = 0; i < txn->writes_count; ++i) {
215         update_t *update = (update_t *)txn->writes[i].rec;
216         update->version = wv;
217     }
218
219     // Lower the reference count for <txn>'s read version
220     unsigned temp = 2;
221     unsigned old_count;
222     do {
223         old_count = temp;
224         temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1);
225         if (temp == 1 && txn->rv != version_) {
226             sl_remove(active_, (map_key_t)txn->rv);
227             break;
228         }
229     } while (old_count != temp);
230
231     nbd_defer_free(txn->writes);
232     nbd_defer_free(txn);
233
234     return state;
235 }
236
237 // Get most recent committed version prior to our read version.
238 map_val_t txn_map_get (txn_t *txn, map_key_t key) {
239     if (txn->state != TXN_RUNNING)
240         return ERROR_TXN_NOT_RUNNING;
241
242     // Iterate through the update records to find the latest committed version prior to our read version. 
243     map_val_t newest_val = map_get(txn->map, key);
244     map_val_t val = newest_val;
245     update_t *update = NULL;
246     for ( ; ; val = update->next) {
247
248         if (!IS_TAGGED(val, TAG2))
249             return val;
250
251         update = (update_t *)STRIP_TAG(val, TAG2);
252         assert(update != NULL);
253
254         // If the update's version is not tagged it means the update is committed.
255         if (!IS_TAGGED(update->version, TAG1)) {
256             if (update->version <= txn->rv)
257                 break; // success
258             continue;
259         }
260
261         // If the update's version is tagged then either the update was aborted or the the version number is 
262         // actually a pointer to a running transaction's txn_t.
263
264         // Skip updates from aborted transactions.
265         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
266             continue;
267
268         // The update's transaction is still in progress. Access its txn_t.
269         txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
270         if (writer == txn) // found our own update
271             break; // success 
272
273         txn_state_e writer_state = writer->state;
274         if (writer_state == TXN_RUNNING)
275             continue; 
276
277         if (writer_state == TXN_VALIDATING) {
278             if (writer->wv > txn->rv)
279                 continue;
280             writer_state = txn_validate(writer);
281         }
282
283         // Skip updates from aborted transactions.
284         if (writer_state == TXN_ABORTED)
285             continue;
286
287         assert(writer_state == TXN_VALIDATED);
288         if (writer->wv > txn->rv)
289             continue;
290         break; // success
291     }
292
293     map_val_t value = update->value;
294
295     // collect some garbage
296     version_t min_active_version = UNDETERMINED_VERSION;
297     update_t *next_update = NULL;
298     if (IS_TAGGED(update->next, TAG2)) {
299         next_update = (update_t *)STRIP_TAG(update->next, TAG2);
300         min_active_version = (version_t)sl_min_key(active_);
301         if (next_update->version < min_active_version) {
302             // <next_update> (and all update records following it [execpt if it is aborted]) is old enough that it is
303             // not visible to any active transaction. We can safely free it.
304
305             // Skip over aborted versions to look for more recent updates
306             update_t *temp = next_update;
307             while (temp->version == ABORTED_VERSION) {
308                 assert(!IS_TAGGED(temp->version, TAG1));
309                 map_val_t next = next_update->next;
310                 if (!IS_TAGGED(next, TAG2))
311                     break;
312
313                 temp = (update_t *)STRIP_TAG(next, TAG2);
314                 if (temp->version >= min_active_version)
315                     return value;
316             }
317
318             // free <next> and all the update records following it
319             temp = next_update;
320             while (1) {
321                 map_val_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST);
322
323                 // if we find ourself in a race just back off and let the other thread take care of it
324                 if (next == DOES_NOT_EXIST) 
325                     return value;
326
327                 if (!IS_TAGGED(next, TAG2))
328                     break;
329
330                 temp = (update_t *)STRIP_TAG(next, TAG2);
331                 nbd_free(update);
332             }
333         }
334     }
335
336     // If there is one item left and it is visible by all active transactions we can merge it into the map itself.
337     // There is no need for an update record.
338     if (next_update == NULL && val == newest_val) {
339         if (min_active_version == UNDETERMINED_VERSION) {
340             min_active_version = (version_t)sl_min_key(active_);
341         }
342         if (update->version <= min_active_version) {
343             if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) {
344                 nbd_defer_free(update);
345             }
346         }
347     }
348     
349     return value;
350 }
351
352 void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) {
353     if (txn->state != TXN_RUNNING)
354         return; // TODO: return some sort of error code
355
356     // create a new update record
357     update_t *update = alloc_update_rec();
358     update->value = value;
359     update->version = TAG_VALUE((version_t)txn, TAG1);
360
361     // push the new update record onto <key>'s update list
362     map_val_t old_update;
363     do {
364         old_update = map_get(txn->map, key);
365         update->next = old_update;
366     } while (map_cas(txn->map, key, old_update, TAG_VALUE((map_val_t)update, TAG2)) != old_update);
367
368     // add <key> to the write set for commit-time validation
369     if (txn->writes_count == txn->writes_size) {
370         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
371         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
372         txn->writes_size *= 2;
373         nbd_free(txn->writes);
374         txn->writes = w;
375     }
376     int i = txn->writes_count++;
377     txn->writes[i].key = key;
378     txn->writes[i].rec = update;
379 }