]> pd.if.org Git - nbds/blob - txn/txn.c
in txn, clean up old update records when they can't be referenced anymore
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8 #include "skiplist.h"
9
10 #define UNDETERMINED_VERSION 0
11 #define ABORTED_VERSION      TAG_VALUE(0)
12 #define INITIAL_WRITES_SIZE  4
13
14 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;
15
16 typedef struct update_rec update_rec_t;
17
18 struct update_rec {
19     update_type_t type;
20     uint64_t value;
21     uint64_t version;
22     update_rec_t *next; // an earlier update
23 };
24
25 typedef struct write_rec {
26     void *key;
27     update_rec_t *rec; 
28 } write_rec_t;
29
30 struct txn {
31     uint64_t rv;
32     uint64_t wv;
33     map_t *map;
34     write_rec_t *writes;
35     uint32_t writes_size;
36     uint32_t writes_count;
37     uint32_t writes_scan;
38     txn_type_e type;
39     txn_state_e state;
40 };
41
42 static uint64_t version_ = 1;
43
44 static txn_state_e txn_validate (txn_t *txn);
45
46 static skiplist_t *active_ = NULL;
47
48 void txn_init (void) {
49     active_ = sl_alloc(NULL);
50 }
51
52 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
53 // read version another transaction committed a change to an entry we are also trying to change.
54 //
55 // If we encounter a potential conflict with a transaction that is in the process of validating, we help it 
56 // complete validating. It must be finished before we can decide to rollback or commit.
57 //
58 static txn_state_e tm_validate_key (txn_t *txn, void *key) {
59     
60     update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
61     for (; update != NULL; update = update->next) {
62
63         // If the update's version is not tagged it means the update is committed.
64         //
65         // We can stop at the first committed record we find that is at least as old as our read version. All 
66         // the other committed records following it will be older. And all the uncommitted records following it 
67         // will eventually conflict with it and abort.
68         if (!IS_TAGGED(update->version)) 
69             return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
70
71         // If the update's version is tagged then either the update was aborted or the the version number is 
72         // actually a pointer to a running transaction's txn_t.
73
74         // Skip aborted transactions.
75         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
76             continue;
77
78         // The update's transaction is still in progress. Access its txn_t.
79         txn_t *writer = (txn_t *)STRIP_TAG(update->version);
80         if (writer == txn)
81             continue; // Skip our own updates.
82         txn_state_e writer_state = writer->state;
83
84         // Any running transaction will only be able to aquire a wv greater than ours. A transaction changes its 
85         // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is
86         // greater than ours. See next comment below for why. 
87         if (writer_state == TXN_RUNNING)
88             continue; 
89         
90         // If <writer> has a later version than us we can safely ignore its updates. It will not commit until
91         // we have completed validation (in order to remain non-blocking it will help us validate if necessary). 
92         // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging 
93         // between validating two conflicting transactions.
94         if (writer_state == TXN_VALIDATING) {
95             if (writer->wv > txn->wv)
96                 continue;
97             // Help <writer> commit. We need to know if <writer> aborts or commits before we can decide what to
98             // do. But we don't want to block, so we assist.
99             writer_state = txn_validate(writer);
100         }
101
102         // Skip updates from aborted transactions.
103         if (writer_state == TXN_ABORTED)
104             continue;
105
106         assert(writer_state == TXN_VALIDATED);
107         return (writer->wv <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED;
108     }
109
110     return TXN_VALIDATED;
111 }
112
113 static txn_state_e txn_validate (txn_t *txn) {
114     int i;
115     switch (txn->state) {
116
117         case TXN_VALIDATING:
118             if (txn->wv == UNDETERMINED_VERSION) {
119                 uint64_t wv = SYNC_ADD(&version_, 1);
120                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
121             }
122
123             for (i = 0; i < txn->writes_count; ++i) {
124                 txn_state_e s = tm_validate_key(txn, txn->writes[i].key);
125                 if (s == TXN_ABORTED) {
126                     txn->state = TXN_ABORTED;
127                     break;
128                 }
129             }
130             if (txn->state == TXN_VALIDATING) {
131                 txn->state =  TXN_VALIDATED;
132             }
133             break;
134
135         case TXN_VALIDATED:
136         case TXN_ABORTED:
137             break;
138
139         default:
140             assert(FALSE);
141     }
142
143     return txn->state;
144 }
145
146 static update_rec_t *alloc_update_rec (void) {
147     update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t));
148     memset(u, 0, sizeof(update_rec_t));
149     return u;
150 }
151
152 txn_t *txn_begin (txn_type_e type, map_t *map) {
153     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
154     memset(txn, 0, sizeof(txn_t));
155     txn->type = type;
156     txn->wv = UNDETERMINED_VERSION;
157     txn->state = TXN_RUNNING;
158     txn->map = map;
159     if (type != TXN_READ_ONLY) {
160         txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
161         txn->writes_size = INITIAL_WRITES_SIZE;
162     }
163
164     // aquire the read version for txn. must be careful to avoid a race
165     do {
166         txn->rv = version_;
167
168         uint64_t old_count;
169         uint64_t temp = 0;
170         do {
171             old_count = temp;
172             temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1);
173         } while (temp != old_count);
174
175         if (txn->rv == version_)
176             break;
177
178         temp = 1;
179         do {
180             old_count = temp;
181             temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
182         } while (temp != old_count);
183     } while (1);
184
185     return txn;
186 }
187
188 void txn_abort (txn_t *txn) {
189
190     int i;
191     for (i = 0; i < txn->writes_count; ++i) {
192         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
193         update->version = ABORTED_VERSION;
194     }
195
196     nbd_defer_free(txn->writes);
197     nbd_defer_free(txn);
198 }
199
200 txn_state_e txn_commit (txn_t *txn) {
201
202     assert(txn->state == TXN_RUNNING);
203     txn->state = TXN_VALIDATING;
204     txn_state_e state = txn_validate(txn);
205
206     // Detach <txn> from its updates.
207     uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv;
208     int i;
209     for (i = 0; i < txn->writes_count; ++i) {
210         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
211         update->version = wv;
212     }
213
214     // Lower the reference count for <txn>'s read version
215     uint64_t temp = 2;
216     uint64_t old_count;
217     do {
218         old_count = temp;
219         temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
220         if (temp == 1 && txn->rv != version_) {
221             sl_remove(active_, (void *)txn->rv);
222             break;
223         }
224     } while (old_count != temp);
225
226     nbd_defer_free(txn->writes);
227     nbd_defer_free(txn);
228
229     return state;
230 }
231
232 // Get most recent committed version prior to our read version.
233 uint64_t tm_get (txn_t *txn, void *key) {
234
235     // Iterate through update records associated with <key> to find the latest committed version prior to our
236     // read version. 
237     update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
238     for (; update != NULL; update = update->next) {
239
240         // If the update's version is not tagged it means the update is committed.
241         if (!IS_TAGGED(update->version)) {
242             if (update->version <= txn->rv)
243                 break; // success
244             continue;
245         }
246
247         // If the update's version is tagged then either the update was aborted or the the version number is 
248         // actually a pointer to a running transaction's txn_t.
249
250         // Skip updates from aborted transactions.
251         if (EXPECT_FALSE(update->version == ABORTED_VERSION))
252             continue;
253
254         // The update's transaction is still in progress. Access its txn_t.
255         txn_t *writer = (txn_t *)STRIP_TAG(update->version);
256         if (writer == txn) // found our own update
257             break; // success 
258
259         txn_state_e writer_state = writer->state;
260         if (writer_state == TXN_RUNNING)
261             continue; 
262
263         if (writer_state == TXN_VALIDATING) {
264             if (writer->wv > txn->rv)
265                 continue;
266             writer_state = txn_validate(writer);
267         }
268
269         // Skip updates from aborted transactions.
270         if (writer_state == TXN_ABORTED)
271             continue;
272
273         assert(writer_state == TXN_VALIDATED);
274         if (writer->wv > txn->rv)
275             continue;
276         break; // success
277     }
278
279     if (EXPECT_FALSE(update == NULL))
280         return DOES_NOT_EXIST;
281
282     // collect some garbage
283     update_rec_t *next = update->next;
284     if (next != NULL) {
285         uint64_t min_active_version = (uint64_t)sl_min_key(active_);
286         if (next->version < min_active_version) {
287             next = SYNC_SWAP(&update->next, NULL);
288             while (next != NULL) {
289                 update = next;
290                 next = NULL;
291                 if (update->next != NULL) {
292                     next = SYNC_SWAP(&update->next, NULL);
293                 }
294                 nbd_free(update);
295             }
296         }
297     }
298     
299     return update->value;
300 }
301
302 void tm_set (txn_t *txn, void *key, uint64_t value) {
303
304     // create a new update record
305     update_rec_t *update = alloc_update_rec();
306     update->type = UPDATE_TYPE_PUT;
307     update->value = value;
308     update->version = TAG_VALUE((uint64_t)txn);
309
310     // push the new update record onto <key>'s update list
311     uint64_t update_prev;
312     do {
313         update->next = (update_rec_t *) map_get(txn->map, key);
314         update_prev = (uint64_t)update->next;
315     } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev);
316
317     // add <key> to the write set for commit-time validation
318     if (txn->writes_count == txn->writes_size) {
319         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
320         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
321         txn->writes_size *= 2;
322     }
323     int i = txn->writes_count++;
324     txn->writes[i].key = key;
325     txn->writes[i].rec = update;
326 }