]> pd.if.org Git - nbds/blob - txn/txn.c
change the names of some variables and types for consistency across API's
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8
9 #define UNDETERMINED_VERSION 0
10 #define INITIAL_WRITES_SIZE  4
11
12 typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t;
13 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;
14
15 typedef struct update_rec update_rec_t;
16
17 struct update_rec {
18     update_type_t type;
19     uint64_t value;
20     uint64_t version;
21     update_rec_t *prev; // a previous update
22 };
23
24 typedef struct write_rec {
25     const char *key;
26     update_rec_t *rec; 
27 } write_rec_t;
28
29 struct txn {
30     uint64_t rv;
31     uint64_t wv;
32     hashtable_t *ht;
33     write_rec_t *writes;
34     uint32_t writes_size;
35     uint32_t writes_count;
36     uint32_t writes_scan;
37     txn_access_t access;
38     txn_isolation_t isolation;
39     txn_state_t state;
40 };
41
42 uint64_t GlobalVersion = 1;
43 uint64_t MinActiveTxnVersion = 0;
44
45 static txn_state_t txn_validate (txn_t *txn);
46
47 update_rec_t *alloc_update_rec (void) {
48     update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t));
49     memset(u, 0, sizeof(update_rec_t));
50     return u;
51 }
52
53 txn_t *txn_begin (txn_access_t access, txn_isolation_t isolation, hashtable_t *ht) {
54     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
55     memset(txn, 0, sizeof(txn_t));
56     txn->access = access;
57     txn->isolation = isolation;
58     txn->rv = GlobalVersion;
59     txn->wv = UNDETERMINED_VERSION;
60     txn->state = TXN_RUNNING;
61     txn->ht = ht;
62     if (isolation != TXN_READ_ONLY) {
63         txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
64         txn->writes_size = INITIAL_WRITES_SIZE;
65     }
66     return txn;
67 }
68
69 // Get most recent committed version prior to our read version.
70 int64_t txn_ht_get (txn_t *txn, const char *key, uint32_t key_len) {
71
72     // Iterate through update records associated with <key> to find the latest committed version. 
73     // We can use the first matching version. Older updates always come later in the list.
74     update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len);
75     for (; update != NULL; update = update->prev) {
76         uint64_t writer_version = update->version;
77         if (writer_version < txn->rv)
78             return update->value;
79
80         // If the version is tagged, it means that it is not a version number, but a pointer to an
81         // in progress transaction.
82         if (IS_TAGGED(update->version)) {
83             txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
84
85             if (writer == txn)
86                 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
87
88             // Skip updates from aborted transactions.
89             txn_state_t writer_state = writer->state;
90             if (EXPECT_FALSE(writer_state == TXN_ABORTED))
91                 continue;
92
93             if (writer_state == TXN_VALIDATING) {
94                 writer_state = txn_validate(writer);
95             }
96
97             if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION)
98                 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
99         }
100     }
101     return DOES_NOT_EXIST;
102 }
103
104 // Validate the updates for <key>. Validation fails for a key we have written to if there is a 
105 // write committed newer than our read version.
106 static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t key_len) {
107     
108     update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len);
109     for (; update != NULL; update = update->prev) {
110         uint64_t writer_version = update->version;
111         if (writer_version <= txn->rv)
112             return TXN_VALIDATED;
113
114         // If the version is tagged, it means it is a pointer to a transaction in progress.
115         if (IS_TAGGED(writer_version)) {
116
117             // Skip aborted transactions.
118             if (EXPECT_FALSE(writer_version == TAG_VALUE(0)))
119                 continue;
120
121             // Skip our own updates.
122             txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
123             if (writer == txn)
124                 continue;
125
126             writer_version = writer->wv;
127             if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION)
128                 return TXN_VALIDATED;
129
130             txn_state_t writer_state = writer->state;
131             if (EXPECT_FALSE(writer_state == TXN_ABORTED))
132                 continue;
133
134             // Help validate <writer> if it is a committing transaction that might cause us to 
135             // abort. However, if the <writer> has a later version than us we can safely ignore its
136             // updates. This protocol ensures a deterministic resolution to every conflict, and 
137             // avoids infinite ping-ponging between validating two conflicting transactions.
138             if (writer_state == TXN_VALIDATING && (writer_version < txn->wv || 
139                                                    writer_version == UNDETERMINED_VERSION)) {
140                 writer_state = txn_validate(writer);
141             }
142
143             if (writer_state == TXN_VALIDATED)
144                 return TXN_ABORTED;
145         }
146
147         return TXN_ABORTED;
148     }
149
150     return TXN_VALIDATED;
151 }
152
153 static txn_state_t txn_validate (txn_t *txn) {
154     int i;
155     switch (txn->state) {
156
157         case TXN_VALIDATING:
158             if (txn->wv == UNDETERMINED_VERSION) {
159                 uint64_t wv = SYNC_ADD(&GlobalVersion, 1);
160                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
161             }
162
163             for (i = 0; i < txn->writes_count; ++i) {
164                 txn_state_t s = txn_ht_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key));
165                 if (s == TXN_ABORTED) {
166                     txn->state = TXN_ABORTED;
167                     break;
168                 }
169             }
170             if (txn->state == TXN_VALIDATING) {
171                 txn->state =  TXN_VALIDATED;
172             }
173             break;
174
175         case TXN_VALIDATED:
176         case TXN_ABORTED:
177             break;
178
179         default:
180             assert(FALSE);
181     }
182
183     return txn->state;
184 }
185
186 void txn_abort (txn_t *txn) {
187
188     int i;
189     for (i = 0; i < txn->writes_count; ++i) {
190         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
191         update->version = TAG_VALUE(0);
192     }
193
194     nbd_defer_free(txn->writes);
195     nbd_defer_free(txn);
196 }
197
198 txn_state_t txn_commit (txn_t *txn) {
199
200     assert(txn->state == TXN_RUNNING);
201     txn->state = TXN_VALIDATING;
202     txn_state_t state = txn_validate(txn);
203
204     // Detach <txn> from its updates.
205     uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv;
206     int i;
207     for (i = 0; i < txn->writes_count; ++i) {
208         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
209         update->version = wv;
210     }
211
212     nbd_defer_free(txn->writes);
213     nbd_defer_free(txn);
214
215     return state;
216 }
217
218 void txn_ht_put (txn_t *txn, const char *key, uint32_t key_len, int64_t value) {
219
220     // create a new update record
221     update_rec_t *update = alloc_update_rec();
222     update->type = UPDATE_TYPE_PUT;
223     update->value = value;
224     update->version = TAG_VALUE((uint64_t)txn);
225
226     // push the new update record onto <key>'s update list
227     int64_t update_prev;
228     do {
229         update->prev = (update_rec_t *) ht_get(txn->ht, key, key_len);
230         update_prev = (int64_t)update->prev;
231     } while (ht_compare_and_set(txn->ht, key, key_len, update_prev, (int64_t)update) != update_prev);
232
233     // add <key> to the write set for commit-time validation
234     if (txn->writes_count == txn->writes_size) {
235         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
236         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
237         txn->writes_size *= 2;
238     }
239     int i = txn->writes_count++;
240     txn->writes[i].key = key;
241     txn->writes[i].rec = update;
242 }
243
244 #ifdef MAKE_txn_test
245 #include "runtime.h"
246 int main (void) {
247     nbd_init();
248     return 0;
249 }
250 #endif//txn_test