ea3d6df4ccbb5729fc0447218979b0bafbee703e
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8 #include "skiplist.h"
9
10 #define UNDETERMINED_VERSION 0
11 #define ABORTED_VERSION      TAG_VALUE(0, TAG1)
12 #define INITIAL_WRITES_SIZE  4
13
14 typedef struct update_rec update_t;
15
16 struct update_rec {
17     update_t *next; // an earlier update
18     uint64_t version;
19     uint64_t value;
20 };
21
22 typedef struct write_rec {
23     void *key;
24     update_t *rec; 
25 } write_rec_t;
26
27 struct txn {
28     uint64_t rv;
29     uint64_t wv;
30     map_t *map;
31     write_rec_t *writes;
32     uint32_t writes_size;
33     uint32_t writes_count;
34     uint32_t writes_scan;
35     txn_type_e type;
36     txn_state_e state;
37 };
38
39 static uint64_t version_ = 1;
40
41 static txn_state_e txn_validate (txn_t *txn);
42
43 static skiplist_t *active_ = NULL;
44
45 void txn_init (void) {
46     active_ = sl_alloc(NULL);
47 }
48
49 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
50 // read version another transaction committed a change to an entry we are also trying to change.
51 //
52 // If we encounter a potential conflict with a transaction that is in the process of validating, we help it 
53 // complete validating. It must be finished before we can decide to rollback or commit.
54 //
55 static txn_state_e tm_validate_key (txn_t *txn, void *key) {
56     
57     update_t *update = (update_t *) map_get(txn->map, key);
58     for (; update != NULL; update = update->next) {
59
60         // If the update or its version is not tagged it means the update is committed.
61         //
62         // We can stop at the first committed record we find that is at least as old as our read version. All 
63         // the other committed records following it will be older. And all the uncommitted records following it 
64         // will eventually conflict with it and abort.
65         if (!IS_TAGGED(update, TAG2))
66             return TXN_VALIDATED;
67         update = (update_t *)STRIP_TAG(update, TAG2);
68         if (!IS_TAGGED(update->version, TAG1)) 
69             return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
70
71         // If the update's version is tagged then either the update was aborted or the the version number is 
72         // actually a pointer to a running transaction's txn_t.
73
74         // Skip aborted transactions.
75         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
76             continue;
77
78         // The update's transaction is still in progress. Access its txn_t.
79         txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
80         if (writer == txn)
81             continue; // Skip our own updates.
82         txn_state_e writer_state = writer->state;
83
84         // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its 
85         // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is
86         // greater than ours. See next comment below for why. 
87         if (writer_state == TXN_RUNNING)
88             continue; 
89         
90         // If <writer> has a later version than us we can safely ignore its updates. It will not commit until
91         // we have completed validation (in order to remain non-blocking it will help us validate if necessary). 
92         // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging 
93         // between validating two conflicting transactions.
94         if (writer_state == TXN_VALIDATING) {
95             if (writer->wv > txn->wv)
96                 continue;
97             // Help <writer> commit. We need to know if <writer> aborts or commits before we can decide what to
98             // do. But we don't want to block, so we assist.
99             writer_state = txn_validate(writer);
100         }
101
102         // Skip updates from aborted transactions.
103         if (writer_state == TXN_ABORTED)
104             continue;
105
106         assert(writer_state == TXN_VALIDATED);
107         return (writer->wv <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
108     }
109
110     return TXN_VALIDATED;
111 }
112
113 static txn_state_e txn_validate (txn_t *txn) {
114     int i;
115     switch (txn->state) {
116
117         case TXN_VALIDATING:
118             if (txn->wv == UNDETERMINED_VERSION) {
119                 uint64_t wv = SYNC_ADD(&version_, 1);
120                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
121             }
122
123             for (i = 0; i < txn->writes_count; ++i) {
124                 txn_state_e s = tm_validate_key(txn, txn->writes[i].key);
125                 if (s == TXN_ABORTED) {
126                     txn->state = TXN_ABORTED;
127                     break;
128                 }
129             }
130             if (txn->state == TXN_VALIDATING) {
131                 txn->state =  TXN_VALIDATED;
132             }
133             break;
134
135         case TXN_VALIDATED:
136         case TXN_ABORTED:
137             break;
138
139         default:
140             assert(FALSE);
141     }
142
143     return txn->state;
144 }
145
146 static update_t *alloc_update_rec (void) {
147     update_t *u = (update_t *)nbd_malloc(sizeof(update_t));
148     memset(u, 0, sizeof(update_t));
149     return u;
150 }
151
152 txn_t *txn_begin (txn_type_e type, map_t *map) {
153     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
154     memset(txn, 0, sizeof(txn_t));
155     txn->type = type;
156     txn->wv = UNDETERMINED_VERSION;
157     txn->state = TXN_RUNNING;
158     txn->map = map;
159     if (type != TXN_READ_ONLY) {
160         txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
161         txn->writes_size = INITIAL_WRITES_SIZE;
162     }
163
164     // acquire the read version for txn. must be careful to avoid a race
165     do {
166         txn->rv = version_;
167
168         uint64_t old_count;
169         uint64_t temp = 0;
170         do {
171             old_count = temp;
172             temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1);
173         } while (temp != old_count);
174
175         if (txn->rv == version_)
176             break;
177
178         temp = 1;
179         do {
180             old_count = temp;
181             temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
182         } while (temp != old_count);
183     } while (1);
184
185     return txn;
186 }
187
188 void txn_abort (txn_t *txn) {
189
190     int i;
191     for (i = 0; i < txn->writes_count; ++i) {
192         update_t *update = (update_t *)txn->writes[i].rec;
193         update->version = ABORTED_VERSION;
194     }
195
196     nbd_defer_free(txn->writes);
197     nbd_defer_free(txn);
198 }
199
200 txn_state_e txn_commit (txn_t *txn) {
201
202     assert(txn->state == TXN_RUNNING);
203     txn->state = TXN_VALIDATING;
204     txn_state_e state = txn_validate(txn);
205
206     // Detach <txn> from its updates.
207     uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv;
208     int i;
209     for (i = 0; i < txn->writes_count; ++i) {
210         update_t *update = (update_t *)txn->writes[i].rec;
211         update->version = wv;
212     }
213
214     // Lower the reference count for <txn>'s read version
215     uint64_t temp = 2;
216     uint64_t old_count;
217     do {
218         old_count = temp;
219         temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
220         if (temp == 1 && txn->rv != version_) {
221             sl_remove(active_, (void *)txn->rv);
222             break;
223         }
224     } while (old_count != temp);
225
226     nbd_defer_free(txn->writes);
227     nbd_defer_free(txn);
228
229     return state;
230 }
231
232 // Get most recent committed version prior to our read version.
233 uint64_t tm_get (txn_t *txn, void *key) {
234
235     update_t *newest_update = (update_t *) map_get(txn->map, key);
236     if (!IS_TAGGED(newest_update, TAG2))
237             return (uint64_t)newest_update;
238
239     // Iterate through the update records to find the latest committed version prior to our read version. 
240     update_t *update;
241     for (update = newest_update; ; update = update->next) {
242
243         if (!IS_TAGGED(update, TAG2))
244             return (uint64_t)update;
245
246         update = (update_t *)STRIP_TAG(update, TAG2);
247         assert(update != NULL);
248
249         // If the update's version is not tagged it means the update is committed.
250         if (!IS_TAGGED(update->version, TAG1)) {
251             if (update->version <= txn->rv)
252                 break; // success
253             continue;
254         }
255
256         // If the update's version is tagged then either the update was aborted or the the version number is 
257         // actually a pointer to a running transaction's txn_t.
258
259         // Skip updates from aborted transactions.
260         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
261             continue;
262
263         // The update's transaction is still in progress. Access its txn_t.
264         txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
265         if (writer == txn) // found our own update
266             break; // success 
267
268         txn_state_e writer_state = writer->state;
269         if (writer_state == TXN_RUNNING)
270             continue; 
271
272         if (writer_state == TXN_VALIDATING) {
273             if (writer->wv > txn->rv)
274                 continue;
275             writer_state = txn_validate(writer);
276         }
277
278         // Skip updates from aborted transactions.
279         if (writer_state == TXN_ABORTED)
280             continue;
281
282         assert(writer_state == TXN_VALIDATED);
283         if (writer->wv > txn->rv)
284             continue;
285         break; // success
286     }
287
288     uint64_t value = update->value;
289
290     // collect some garbage
291     update_t *last = update;
292     update_t *next = update->next;
293     uint64_t min_active = 0;
294     if (IS_TAGGED(next, TAG2)) {
295         next = (update_t *)STRIP_TAG(next, TAG2);
296         min_active = (uint64_t)sl_min_key(active_);
297         if (next->version < min_active) {
298
299             // Skip over aborted versions to verify the chain of updates is old enough for collection
300             update_t *temp = next;
301             while (temp->version == ABORTED_VERSION) {
302                 assert(!IS_TAGGED(temp->version, TAG1));
303                 update_t *temp = next->next;
304                 if (!IS_TAGGED(temp, TAG2))
305                     break;
306                 temp = (update_t *)STRIP_TAG(temp, TAG2);
307                 if (temp->version >= min_active)
308                     return value;
309                 temp = temp->next;
310             }
311
312             // collect <next> and all the update records following it
313             do {
314                 next = SYNC_SWAP(&update->next, NULL);
315
316                 // if we find ourself in a race just back off and let the other thread take care of it
317                 if (next == NULL) 
318                     return value;
319
320                 update = next;
321                 next = next->next;
322                 nbd_free(update);
323             } while (IS_TAGGED(next, TAG2));
324         }
325     }
326
327     // If there is one item left and it is visible by all active transactions we can merge it into the map itself.
328     // There is no need for an update record.
329     if (next == NULL && last == (update_t *)STRIP_TAG(newest_update, TAG2)) {
330         if (min_active == UNDETERMINED_VERSION) {
331             min_active = (uint64_t)sl_min_key(active_);
332         }
333         if (last->version <= min_active) {
334             if (map_cas(txn->map, key, TAG_VALUE(last, TAG2), value) == TAG_VALUE(last, TAG2)) {
335                 nbd_defer_free(last);
336             }
337         }
338     } 
339     
340     return value;
341 }
342
343 void tm_set (txn_t *txn, void *key, uint64_t value) {
344
345     // create a new update record
346     update_t *update = alloc_update_rec();
347     update->value = value;
348     update->version = TAG_VALUE(txn, TAG1);
349
350     // push the new update record onto <key>'s update list
351     uint64_t old_update;
352     do {
353         old_update = map_get(txn->map, key);
354         update->next = (update_t *)old_update;
355     } while (map_cas(txn->map, key, old_update, TAG_VALUE(update, TAG2)) != old_update);
356
357     // add <key> to the write set for commit-time validation
358     if (txn->writes_count == txn->writes_size) {
359         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
360         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
361         txn->writes_size *= 2;
362         nbd_free(txn->writes);
363         txn->writes = w;
364     }
365     int i = txn->writes_count++;
366     txn->writes[i].key = key;
367     txn->writes[i].rec = update;
368 }