refactor header files
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8 #include "skiplist.h"
9
10 #define UNDETERMINED_VERSION 0
11 #define INITIAL_WRITES_SIZE  4
12
13 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;
14
15 typedef struct update_rec update_rec_t;
16
17 struct update_rec {
18     update_type_t type;
19     uint64_t value;
20     uint64_t version;
21     update_rec_t *prev; // a previous update
22 };
23
24 typedef struct write_rec {
25     void *key;
26     update_rec_t *rec; 
27 } write_rec_t;
28
29 struct txn {
30     uint64_t rv;
31     uint64_t wv;
32     map_t *map;
33     write_rec_t *writes;
34     uint32_t writes_size;
35     uint32_t writes_count;
36     uint32_t writes_scan;
37     txn_type_e type;
38     txn_state_e state;
39 };
40
41 static uint64_t version_ = 1;
42
43 static txn_state_e txn_validate (txn_t *txn);
44
45 static map_t *active_ = NULL;
46
47 void txn_init (void) {
48     active_ = map_alloc(MAP_TYPE_SKIPLIST, NULL);
49 }
50
51 // Validate the updates for <key>. Validation fails for a key we have written to if there is a 
52 // write committed newer than our read version.
53 static txn_state_e tm_validate_key (txn_t *txn, void *key) {
54     
55     update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
56     for (; update != NULL; update = update->prev) {
57         uint64_t writer_version = update->version;
58         if (writer_version <= txn->rv)
59             return TXN_VALIDATED;
60
61         // If the version is tagged, it means it is a pointer to a transaction in progress.
62         if (IS_TAGGED(writer_version)) {
63
64             // Skip aborted transactions.
65             if (EXPECT_FALSE(writer_version == TAG_VALUE(0)))
66                 continue;
67
68             // Skip our own updates.
69             txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
70             if (writer == txn)
71                 continue;
72
73             writer_version = writer->wv;
74             if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION)
75                 return TXN_VALIDATED;
76
77             txn_state_e writer_state = writer->state;
78             if (EXPECT_FALSE(writer_state == TXN_ABORTED))
79                 continue;
80
81             // Help validate <writer> if it is a committing transaction that might cause us to 
82             // abort. However, if the <writer> has a later version than us we can safely ignore its
83             // updates. This protocol ensures a deterministic resolution to every conflict, and 
84             // avoids infinite ping-ponging between validating two conflicting transactions.
85             if (writer_state == TXN_VALIDATING && (writer_version < txn->wv || 
86                                                    writer_version == UNDETERMINED_VERSION)) {
87                 writer_state = txn_validate(writer);
88             }
89
90             if (writer_state == TXN_VALIDATED)
91                 return TXN_ABORTED;
92         }
93
94         return TXN_ABORTED;
95     }
96
97     return TXN_VALIDATED;
98 }
99
100 static txn_state_e txn_validate (txn_t *txn) {
101     int i;
102     switch (txn->state) {
103
104         case TXN_VALIDATING:
105             if (txn->wv == UNDETERMINED_VERSION) {
106                 uint64_t wv = SYNC_ADD(&version_, 1);
107                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
108             }
109
110             for (i = 0; i < txn->writes_count; ++i) {
111                 txn_state_e s = tm_validate_key(txn, txn->writes[i].key);
112                 if (s == TXN_ABORTED) {
113                     txn->state = TXN_ABORTED;
114                     break;
115                 }
116             }
117             if (txn->state == TXN_VALIDATING) {
118                 txn->state =  TXN_VALIDATED;
119             }
120             break;
121
122         case TXN_VALIDATED:
123         case TXN_ABORTED:
124             break;
125
126         default:
127             assert(FALSE);
128     }
129
130     return txn->state;
131 }
132
133 static update_rec_t *alloc_update_rec (void) {
134     update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t));
135     memset(u, 0, sizeof(update_rec_t));
136     return u;
137 }
138
139 txn_t *txn_begin (txn_type_e type, map_t *map) {
140     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
141     memset(txn, 0, sizeof(txn_t));
142     txn->type = type;
143     txn->wv = UNDETERMINED_VERSION;
144     txn->state = TXN_RUNNING;
145     txn->map = map;
146     if (type != TXN_READ_ONLY) {
147         txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
148         txn->writes_size = INITIAL_WRITES_SIZE;
149     }
150
151     // aquire the read version for txn.
152     do {
153         txn->rv = version_;
154
155         uint64_t old_count;
156         uint64_t temp = 0;
157         do {
158             old_count = temp;
159             temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1);
160         } while (temp != old_count);
161
162         if (txn->rv == version_)
163             break;
164
165         temp = 1;
166         do {
167             old_count = temp;
168             temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
169         } while (temp != old_count);
170     } while (1);
171
172     return txn;
173 }
174
175 void txn_abort (txn_t *txn) {
176
177     int i;
178     for (i = 0; i < txn->writes_count; ++i) {
179         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
180         update->version = TAG_VALUE(0);
181     }
182
183     nbd_defer_free(txn->writes);
184     nbd_defer_free(txn);
185 }
186
187 txn_state_e txn_commit (txn_t *txn) {
188
189     assert(txn->state == TXN_RUNNING);
190     txn->state = TXN_VALIDATING;
191     txn_state_e state = txn_validate(txn);
192
193     // Detach <txn> from its updates.
194     uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv;
195     int i;
196     for (i = 0; i < txn->writes_count; ++i) {
197         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
198         update->version = wv;
199     }
200
201     nbd_defer_free(txn->writes);
202     nbd_defer_free(txn);
203
204     return state;
205 }
206
207 // Get most recent committed version prior to our read version.
208 uint64_t tm_get (txn_t *txn, void *key) {
209
210     // Iterate through update records associated with <key> to find the latest committed version. 
211     // We can use the first matching version. Older updates always come later in the list.
212     update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
213     for (; update != NULL; update = update->prev) {
214         uint64_t writer_version = update->version;
215         if (writer_version < txn->rv)
216             return update->value;
217
218         // If the version is tagged, it means that it is not a version number, but a pointer to an
219         // in progress transaction.
220         if (IS_TAGGED(update->version)) {
221             txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
222
223             if (writer == txn)
224                 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
225
226             // Skip updates from aborted transactions.
227             txn_state_e writer_state = writer->state;
228             if (EXPECT_FALSE(writer_state == TXN_ABORTED))
229                 continue;
230
231             if (writer_state == TXN_VALIDATING) {
232                 writer_state = txn_validate(writer);
233             }
234
235             if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION)
236                 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
237         }
238     }
239     return DOES_NOT_EXIST;
240 }
241
242 void tm_set (txn_t *txn, void *key, uint64_t value) {
243
244     // create a new update record
245     update_rec_t *update = alloc_update_rec();
246     update->type = UPDATE_TYPE_PUT;
247     update->value = value;
248     update->version = TAG_VALUE((uint64_t)txn);
249
250     // push the new update record onto <key>'s update list
251     uint64_t update_prev;
252     do {
253         update->prev = (update_rec_t *) map_get(txn->map, key);
254         update_prev = (uint64_t)update->prev;
255     } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev);
256
257     // add <key> to the write set for commit-time validation
258     if (txn->writes_count == txn->writes_size) {
259         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
260         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
261         txn->writes_size *= 2;
262     }
263     int i = txn->writes_count++;
264     txn->writes[i].key = key;
265     txn->writes[i].rec = update;
266 }