generic map iterator interface
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8 #include "skiplist.h"
9
10 #define UNDETERMINED_VERSION 0
11 #define ABORTED_VERSION      TAG_VALUE(0, TAG1)
12 #define INITIAL_WRITES_SIZE  4
13
14 typedef struct update_rec update_t;
15
16 struct update_rec {
17     update_t *next; // an earlier update
18     uint64_t version;
19     uint64_t value;
20 };
21
22 typedef struct write_rec {
23     void *key;
24     update_t *rec; 
25 } write_rec_t;
26
27 struct txn {
28     uint64_t rv;
29     uint64_t wv;
30     map_t *map;
31     write_rec_t *writes;
32     uint32_t writes_size;
33     uint32_t writes_count;
34     uint32_t writes_scan;
35     txn_state_e state;
36 };
37
38 static uint64_t version_ = 1;
39
40 static txn_state_e txn_validate (txn_t *txn);
41
42 static skiplist_t *active_ = NULL;
43
44 void txn_init (void) {
45     active_ = sl_alloc(NULL);
46 }
47
48 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
49 // read version another transaction committed a change to an entry we are also trying to change.
50 //
51 // If we encounter a potential conflict with a transaction that is in the process of validating, we help it 
52 // complete validating. It must be finished before we can decide to rollback or commit.
53 //
54 static txn_state_e validate_key (txn_t *txn, void *key) {
55     assert(txn->state != TXN_RUNNING);
56     
57     update_t *update = (update_t *) map_get(txn->map, key);
58     for (; update != NULL; update = update->next) {
59
60         // If the update or its version is not tagged it means the update is committed.
61         //
62         // We can stop at the first committed record we find that is at least as old as our read version. All 
63         // the other committed records following it will be older. And all the uncommitted records following it 
64         // will eventually conflict with it and abort.
65         if (!IS_TAGGED(update, TAG2))
66             return TXN_VALIDATED;
67         update = (update_t *)STRIP_TAG(update, TAG2);
68         if (!IS_TAGGED(update->version, TAG1)) 
69             return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
70
71         // If the update's version is tagged then either the update was aborted or the the version number is 
72         // actually a pointer to a running transaction's txn_t.
73
74         // Skip aborted transactions.
75         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
76             continue;
77
78         // The update's transaction is still in progress. Access its txn_t.
79         txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
80         if (writer == txn)
81             continue; // Skip our own updates.
82         txn_state_e writer_state = writer->state;
83
84         // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its 
85         // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is
86         // greater than ours. See next comment below for why. 
87         if (writer_state == TXN_RUNNING)
88             continue; 
89         
90         // If <writer> has a later version than us we can safely ignore its updates. It will not commit until
91         // we have completed validation (in order to remain non-blocking it will help us validate if necessary). 
92         // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging 
93         // between validating two conflicting transactions.
94         if (writer_state == TXN_VALIDATING) {
95             if (writer->wv > txn->wv)
96                 continue;
97             // Help <writer> commit. We need to know if <writer> aborts or commits before we can decide what to
98             // do. But we don't want to block, so we assist.
99             writer_state = txn_validate(writer);
100         }
101
102         // Skip updates from aborted transactions.
103         if (writer_state == TXN_ABORTED)
104             continue;
105
106         assert(writer_state == TXN_VALIDATED);
107         return (writer->wv <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
108     }
109
110     return TXN_VALIDATED;
111 }
112
113 static txn_state_e txn_validate (txn_t *txn) {
114     assert(txn->state != TXN_RUNNING);
115     int i;
116     switch (txn->state) {
117
118         case TXN_VALIDATING:
119             if (txn->wv == UNDETERMINED_VERSION) {
120                 uint64_t wv = SYNC_ADD(&version_, 1);
121                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
122             }
123
124             for (i = 0; i < txn->writes_count; ++i) {
125                 txn_state_e s = validate_key(txn, txn->writes[i].key);
126                 if (s == TXN_ABORTED) {
127                     txn->state = TXN_ABORTED;
128                     break;
129                 }
130                 assert(s == TXN_VALIDATED);
131             }
132             if (txn->state == TXN_VALIDATING) {
133                 txn->state =  TXN_VALIDATED;
134             }
135             break;
136
137         case TXN_VALIDATED:
138         case TXN_ABORTED:
139             break;
140
141         default:
142             assert(FALSE);
143     }
144
145     return txn->state;
146 }
147
148 static update_t *alloc_update_rec (void) {
149     update_t *u = (update_t *)nbd_malloc(sizeof(update_t));
150     memset(u, 0, sizeof(update_t));
151     return u;
152 }
153
154 txn_t *txn_begin (map_t *map) {
155     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
156     memset(txn, 0, sizeof(txn_t));
157     txn->wv = UNDETERMINED_VERSION;
158     txn->state = TXN_RUNNING;
159     txn->map = map;
160     txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
161     txn->writes_size = INITIAL_WRITES_SIZE;
162
163     // acquire the read version for txn. must be careful to avoid a race
164     do {
165         txn->rv = version_;
166
167         uint64_t old_count;
168         uint64_t temp = 0;
169         do {
170             old_count = temp;
171             temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1);
172         } while (temp != old_count);
173
174         if (txn->rv == version_)
175             break;
176
177         temp = 1;
178         do {
179             old_count = temp;
180             temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
181         } while (temp != old_count);
182     } while (1);
183
184     return txn;
185 }
186
187 void txn_abort (txn_t *txn) {
188     if (txn->state != TXN_RUNNING)
189         return; // TODO: return some sort of error code
190
191     int i;
192     for (i = 0; i < txn->writes_count; ++i) {
193         update_t *update = (update_t *)txn->writes[i].rec;
194         update->version = ABORTED_VERSION;
195     }
196
197     nbd_defer_free(txn->writes);
198     nbd_defer_free(txn);
199 }
200
201 txn_state_e txn_commit (txn_t *txn) {
202     if (txn->state != TXN_RUNNING)
203         return txn->state; // TODO: return some sort of error code
204
205     assert(txn->state == TXN_RUNNING);
206     txn->state = TXN_VALIDATING;
207     txn_state_e state = txn_validate(txn);
208
209     // Detach <txn> from its updates.
210     uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv;
211     int i;
212     for (i = 0; i < txn->writes_count; ++i) {
213         update_t *update = (update_t *)txn->writes[i].rec;
214         update->version = wv;
215     }
216
217     // Lower the reference count for <txn>'s read version
218     uint64_t temp = 2;
219     uint64_t old_count;
220     do {
221         old_count = temp;
222         temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
223         if (temp == 1 && txn->rv != version_) {
224             sl_remove(active_, (void *)txn->rv);
225             break;
226         }
227     } while (old_count != temp);
228
229     nbd_defer_free(txn->writes);
230     nbd_defer_free(txn);
231
232     return state;
233 }
234
235 // Get most recent committed version prior to our read version.
236 uint64_t txn_map_get (txn_t *txn, void *key) {
237     if (txn->state != TXN_RUNNING)
238         return ERROR_TXN_NOT_RUNNING;
239
240     update_t *newest_update = (update_t *) map_get(txn->map, key);
241     if (!IS_TAGGED(newest_update, TAG2))
242             return (uint64_t)newest_update;
243
244     // Iterate through the update records to find the latest committed version prior to our read version. 
245     update_t *update;
246     for (update = newest_update; ; update = update->next) {
247
248         if (!IS_TAGGED(update, TAG2))
249             return (uint64_t)update;
250
251         update = (update_t *)STRIP_TAG(update, TAG2);
252         assert(update != NULL);
253
254         // If the update's version is not tagged it means the update is committed.
255         if (!IS_TAGGED(update->version, TAG1)) {
256             if (update->version <= txn->rv)
257                 break; // success
258             continue;
259         }
260
261         // If the update's version is tagged then either the update was aborted or the the version number is 
262         // actually a pointer to a running transaction's txn_t.
263
264         // Skip updates from aborted transactions.
265         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
266             continue;
267
268         // The update's transaction is still in progress. Access its txn_t.
269         txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
270         if (writer == txn) // found our own update
271             break; // success 
272
273         txn_state_e writer_state = writer->state;
274         if (writer_state == TXN_RUNNING)
275             continue; 
276
277         if (writer_state == TXN_VALIDATING) {
278             if (writer->wv > txn->rv)
279                 continue;
280             writer_state = txn_validate(writer);
281         }
282
283         // Skip updates from aborted transactions.
284         if (writer_state == TXN_ABORTED)
285             continue;
286
287         assert(writer_state == TXN_VALIDATED);
288         if (writer->wv > txn->rv)
289             continue;
290         break; // success
291     }
292
293     uint64_t value = update->value;
294
295     // collect some garbage
296     update_t *last = update;
297     update_t *next = update->next;
298     uint64_t min_active = 0;
299     if (IS_TAGGED(next, TAG2)) {
300         next = (update_t *)STRIP_TAG(next, TAG2);
301         min_active = (uint64_t)sl_min_key(active_);
302         if (next->version < min_active) {
303
304             // Skip over aborted versions to verify the chain of updates is old enough for collection
305             update_t *temp = next;
306             while (temp->version == ABORTED_VERSION) {
307                 assert(!IS_TAGGED(temp->version, TAG1));
308                 update_t *temp = next->next;
309                 if (!IS_TAGGED(temp, TAG2))
310                     break;
311                 temp = (update_t *)STRIP_TAG(temp, TAG2);
312                 if (temp->version >= min_active)
313                     return value;
314                 temp = temp->next;
315             }
316
317             // collect <next> and all the update records following it
318             do {
319                 next = SYNC_SWAP(&update->next, NULL);
320
321                 // if we find ourself in a race just back off and let the other thread take care of it
322                 if (next == NULL) 
323                     return value;
324
325                 update = next;
326                 next = next->next;
327                 nbd_free(update);
328             } while (IS_TAGGED(next, TAG2));
329         }
330     }
331
332     // If there is one item left and it is visible by all active transactions we can merge it into the map itself.
333     // There is no need for an update record.
334     if (next == NULL && last == (update_t *)STRIP_TAG(newest_update, TAG2)) {
335         if (min_active == UNDETERMINED_VERSION) {
336             min_active = (uint64_t)sl_min_key(active_);
337         }
338         if (last->version <= min_active) {
339             if (map_cas(txn->map, key, TAG_VALUE(last, TAG2), value) == TAG_VALUE(last, TAG2)) {
340                 nbd_defer_free(last);
341             }
342         }
343     } 
344     
345     return value;
346 }
347
348 void txn_map_set (txn_t *txn, void *key, uint64_t value) {
349     if (txn->state != TXN_RUNNING)
350         return; // TODO: return some sort of error code
351
352     // create a new update record
353     update_t *update = alloc_update_rec();
354     update->value = value;
355     update->version = TAG_VALUE(txn, TAG1);
356
357     // push the new update record onto <key>'s update list
358     uint64_t old_update;
359     do {
360         old_update = map_get(txn->map, key);
361         update->next = (update_t *)old_update;
362     } while (map_cas(txn->map, key, old_update, TAG_VALUE(update, TAG2)) != old_update);
363
364     // add <key> to the write set for commit-time validation
365     if (txn->writes_count == txn->writes_size) {
366         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
367         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
368         txn->writes_size *= 2;
369         nbd_free(txn->writes);
370         txn->writes = w;
371     }
372     int i = txn->writes_count++;
373     txn->writes[i].key = key;
374     txn->writes[i].rec = update;
375 }