Warning tests are failing in this version
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8 #include "skiplist.h"
9
10 #define UNDETERMINED_VERSION 0
11 #define ABORTED_VERSION      TAG_VALUE(0, TAG1)
12 #define INITIAL_WRITES_SIZE  4
13
14 typedef struct update_rec update_t;
15
16 struct update_rec {
17     uint64_t version;
18     map_val_t value;
19     map_val_t next; // an earlier update
20 };
21
22 typedef struct write_rec {
23     map_key_t key;
24     update_t *rec; 
25 } write_rec_t;
26
27 struct txn {
28     uint64_t rv;
29     uint64_t wv;
30     map_t *map;
31     write_rec_t *writes;
32     uint64_t writes_size;
33     uint64_t writes_count;
34     uint64_t writes_scan;
35     txn_state_e state;
36 };
37
38 static txn_state_e txn_validate (txn_t *txn);
39
40 static uint64_t version_ = 1;
41
42 static skiplist_t *active_ = NULL;
43
44 void txn_init (void) {
45     active_ = sl_alloc(NULL);
46 }
47
48 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
49 // read version another transaction committed a change to an entry we are also trying to change.
50 //
51 // If we encounter a potential conflict with a transaction that is in the process of validating, we help it 
52 // complete validating. It must be finished before we can decide to rollback or commit.
53 //
54 static txn_state_e validate_key (txn_t *txn, map_key_t key) {
55     assert(txn->state != TXN_RUNNING);
56     
57     map_val_t val = map_get(txn->map, key);
58     update_t *update = NULL;
59     for (; val != DOES_NOT_EXIST; val = update->next) {
60
61         // If the update or its version is not tagged it means the update is committed.
62         //
63         // We can stop at the first committed record we find that is at least as old as our read version. All 
64         // the other committed records following it will be older. And all the uncommitted records following it 
65         // will eventually conflict with it and abort.
66         if (!IS_TAGGED(val, TAG2))
67             return TXN_VALIDATED;
68         update = (update_t *)(size_t)STRIP_TAG(val, TAG2);
69         if (!IS_TAGGED(update->version, TAG1)) 
70             return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
71
72         // If the update's version is tagged then either the update was aborted or the the version number is 
73         // actually a pointer to a running transaction's txn_t.
74
75         // Skip aborted transactions.
76         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
77             continue;
78
79         // The update's transaction is still in progress. Access its txn_t.
80         txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1);
81         if (writer == txn)
82             continue; // Skip our own updates.
83         txn_state_e writer_state = writer->state;
84
85         // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its 
86         // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is
87         // greater than ours. See next comment below for why. 
88         if (writer_state == TXN_RUNNING)
89             continue; 
90         
91         // If <writer> has a later version than us we can safely ignore its updates. It will not commit until
92         // we have completed validation (in order to remain non-blocking it will help us validate if necessary). 
93         // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging 
94         // between validating two conflicting transactions.
95         if (writer_state == TXN_VALIDATING) {
96             if (writer->wv > txn->wv)
97                 continue;
98             // Help <writer> commit. We need to know if <writer> aborts or commits before we can decide what to
99             // do. But we don't want to block, so we assist.
100             writer_state = txn_validate(writer);
101         }
102
103         // Skip updates from aborted transactions.
104         if (writer_state == TXN_ABORTED)
105             continue;
106
107         assert(writer_state == TXN_VALIDATED);
108         return (writer->wv <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
109     }
110
111     return TXN_VALIDATED;
112 }
113
114 static txn_state_e txn_validate (txn_t *txn) {
115     assert(txn->state != TXN_RUNNING);
116     int i;
117     switch (txn->state) {
118
119         case TXN_VALIDATING:
120             if (txn->wv == UNDETERMINED_VERSION) {
121                 uint64_t wv = SYNC_ADD(&version_, 1);
122                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
123             }
124
125             for (i = 0; i < txn->writes_count; ++i) {
126                 txn_state_e s = validate_key(txn, txn->writes[i].key);
127                 if (s == TXN_ABORTED) {
128                     txn->state = TXN_ABORTED;
129                     break;
130                 }
131                 assert(s == TXN_VALIDATED);
132             }
133             if (txn->state == TXN_VALIDATING) {
134                 txn->state =  TXN_VALIDATED;
135             }
136             break;
137
138         case TXN_VALIDATED:
139         case TXN_ABORTED:
140             break;
141
142         default:
143             assert(FALSE);
144     }
145
146     return txn->state;
147 }
148
149 static update_t *alloc_update_rec (void) {
150     update_t *u = (update_t *)nbd_malloc(sizeof(update_t));
151     memset(u, 0, sizeof(update_t));
152     return u;
153 }
154
155 txn_t *txn_begin (map_t *map) {
156     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
157     memset(txn, 0, sizeof(txn_t));
158     txn->wv = UNDETERMINED_VERSION;
159     txn->state = TXN_RUNNING;
160     txn->map = map;
161     txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
162     txn->writes_size = INITIAL_WRITES_SIZE;
163
164     // acquire the read version for txn. must be careful to avoid a race
165     do {
166         txn->rv = version_;
167
168         uint64_t old_count;
169         uint64_t temp = 0;
170         do {
171             old_count = temp;
172             temp = (uint64_t)sl_cas(active_, (map_key_t)txn->rv, old_count, old_count + 1);
173         } while (temp != old_count);
174
175         if (txn->rv == version_)
176             break;
177
178         temp = 1;
179         do {
180             old_count = temp;
181             temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1);
182         } while (temp != old_count);
183     } while (1);
184
185     return txn;
186 }
187
188 void txn_abort (txn_t *txn) {
189     if (txn->state != TXN_RUNNING)
190         return; // TODO: return some sort of error code
191
192     int i;
193     for (i = 0; i < txn->writes_count; ++i) {
194         update_t *update = (update_t *)txn->writes[i].rec;
195         update->version = ABORTED_VERSION;
196     }
197
198     nbd_defer_free(txn->writes);
199     nbd_defer_free(txn);
200 }
201
202 txn_state_e txn_commit (txn_t *txn) {
203     if (txn->state != TXN_RUNNING)
204         return txn->state; // TODO: return some sort of error code
205
206     assert(txn->state == TXN_RUNNING);
207     txn->state = TXN_VALIDATING;
208     txn_state_e state = txn_validate(txn);
209
210     // Detach <txn> from its updates.
211     uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv;
212     int i;
213     for (i = 0; i < txn->writes_count; ++i) {
214         update_t *update = (update_t *)txn->writes[i].rec;
215         update->version = wv;
216     }
217
218     // Lower the reference count for <txn>'s read version
219     uint64_t temp = 2;
220     uint64_t old_count;
221     do {
222         old_count = temp;
223         temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1);
224         if (temp == 1 && txn->rv != version_) {
225             sl_remove(active_, (map_key_t)txn->rv);
226             break;
227         }
228     } while (old_count != temp);
229
230     nbd_defer_free(txn->writes);
231     nbd_defer_free(txn);
232
233     return state;
234 }
235
236 // Get most recent committed version prior to our read version.
237 uint64_t txn_map_get (txn_t *txn, map_key_t key) {
238     if (txn->state != TXN_RUNNING)
239         return ERROR_TXN_NOT_RUNNING;
240
241     // Iterate through the update records to find the latest committed version prior to our read version. 
242     uint64_t newest_val = map_get(txn->map, key);
243     uint64_t val = newest_val;
244     update_t *update = NULL;
245     for ( ; ; val = update->next) {
246
247         if (!IS_TAGGED(val, TAG2))
248             return val;
249
250         update = (update_t *)(size_t)STRIP_TAG(val, TAG2);
251         assert(update != NULL);
252
253         // If the update's version is not tagged it means the update is committed.
254         if (!IS_TAGGED(update->version, TAG1)) {
255             if (update->version <= txn->rv)
256                 break; // success
257             continue;
258         }
259
260         // If the update's version is tagged then either the update was aborted or the the version number is 
261         // actually a pointer to a running transaction's txn_t.
262
263         // Skip updates from aborted transactions.
264         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
265             continue;
266
267         // The update's transaction is still in progress. Access its txn_t.
268         txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1);
269         if (writer == txn) // found our own update
270             break; // success 
271
272         txn_state_e writer_state = writer->state;
273         if (writer_state == TXN_RUNNING)
274             continue; 
275
276         if (writer_state == TXN_VALIDATING) {
277             if (writer->wv > txn->rv)
278                 continue;
279             writer_state = txn_validate(writer);
280         }
281
282         // Skip updates from aborted transactions.
283         if (writer_state == TXN_ABORTED)
284             continue;
285
286         assert(writer_state == TXN_VALIDATED);
287         if (writer->wv > txn->rv)
288             continue;
289         break; // success
290     }
291
292     uint64_t value = update->value;
293
294     // collect some garbage
295     uint64_t min_active_version = UNDETERMINED_VERSION;
296     update_t *next_update = NULL;
297     if (IS_TAGGED(update->next, TAG2)) {
298         next_update = (update_t *)(size_t)STRIP_TAG(update->next, TAG2);
299         min_active_version = (uint64_t)sl_min_key(active_);
300         if (next_update->version < min_active_version) {
301             // <next_update> (and all update records following it [execpt if it is aborted]) is old enough that it is
302             // not visible to any active transaction. We can safely free it.
303
304             // Skip over aborted versions to look for more recent updates
305             update_t *temp = next_update;
306             while (temp->version == ABORTED_VERSION) {
307                 assert(!IS_TAGGED(temp->version, TAG1));
308                 uint64_t next = next_update->next;
309                 if (!IS_TAGGED(next, TAG2))
310                     break;
311
312                 temp = (update_t *)(size_t)STRIP_TAG(next, TAG2);
313                 if (temp->version >= min_active_version)
314                     return value;
315             }
316
317             // free <next> and all the update records following it
318             temp = next_update;
319             while (1) {
320                 uint64_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST);
321
322                 // if we find ourself in a race just back off and let the other thread take care of it
323                 if (next == DOES_NOT_EXIST) 
324                     return value;
325
326                 if (!IS_TAGGED(next, TAG2))
327                     break;
328
329                 temp = (update_t *)(size_t)STRIP_TAG(next, TAG2);
330                 nbd_free(update);
331             }
332         }
333     }
334
335     // If there is one item left and it is visible by all active transactions we can merge it into the map itself.
336     // There is no need for an update record.
337     if (next_update == NULL && val == newest_val) {
338         if (min_active_version == UNDETERMINED_VERSION) {
339             min_active_version = (uint64_t)sl_min_key(active_);
340         }
341         if (update->version <= min_active_version) {
342             if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) {
343                 nbd_defer_free(update);
344             }
345         }
346     }
347     
348     return value;
349 }
350
351 void txn_map_set (txn_t *txn, map_key_t key, uint64_t value) {
352     if (txn->state != TXN_RUNNING)
353         return; // TODO: return some sort of error code
354
355     // create a new update record
356     update_t *update = alloc_update_rec();
357     update->value = value;
358     update->version = TAG_VALUE((uint64_t)(size_t)txn, TAG1);
359
360     // push the new update record onto <key>'s update list
361     uint64_t old_update;
362     do {
363         old_update = map_get(txn->map, key);
364         update->next = old_update;
365     } while (map_cas(txn->map, key, old_update, TAG_VALUE((uint64_t)(size_t)update, TAG2)) != old_update);
366
367     // add <key> to the write set for commit-time validation
368     if (txn->writes_count == txn->writes_size) {
369         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
370         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
371         txn->writes_size *= 2;
372         nbd_free(txn->writes);
373         txn->writes = w;
374     }
375     int i = txn->writes_count++;
376     txn->writes[i].key = key;
377     txn->writes[i].rec = update;
378 }