all structures now support arbitrary type keys with a fast path for integers
[nbds] / txn / txn.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  */
5 #include "common.h"
6 #include "txn.h"
7 #include "mem.h"
8
9 #define UNDETERMINED_VERSION 0
10 #define INITIAL_WRITES_SIZE  4
11
12 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;
13
14 typedef struct update_rec update_rec_t;
15
16 struct update_rec {
17     update_type_t type;
18     uint64_t value;
19     uint64_t version;
20     update_rec_t *prev; // a previous update
21 };
22
23 typedef struct write_rec {
24     void *key;
25     update_rec_t *rec; 
26 } write_rec_t;
27
28 struct txn {
29     uint64_t rv;
30     uint64_t wv;
31     map_t *map;
32     write_rec_t *writes;
33     uint32_t writes_size;
34     uint32_t writes_count;
35     uint32_t writes_scan;
36     txn_access_e access;
37     txn_isolation_e isolation;
38     txn_state_e state;
39 };
40
41 static txn_state_e txn_validate (txn_t *txn);
42
43 static uint64_t version_ = 1;
44
45 // Validate the updates for <key>. Validation fails for a key we have written to if there is a 
46 // write committed newer than our read version.
47 static txn_state_e tm_validate_key (txn_t *txn, void *key) {
48     
49     update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
50     for (; update != NULL; update = update->prev) {
51         uint64_t writer_version = update->version;
52         if (writer_version <= txn->rv)
53             return TXN_VALIDATED;
54
55         // If the version is tagged, it means it is a pointer to a transaction in progress.
56         if (IS_TAGGED(writer_version)) {
57
58             // Skip aborted transactions.
59             if (EXPECT_FALSE(writer_version == TAG_VALUE(0)))
60                 continue;
61
62             // Skip our own updates.
63             txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
64             if (writer == txn)
65                 continue;
66
67             writer_version = writer->wv;
68             if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION)
69                 return TXN_VALIDATED;
70
71             txn_state_e writer_state = writer->state;
72             if (EXPECT_FALSE(writer_state == TXN_ABORTED))
73                 continue;
74
75             // Help validate <writer> if it is a committing transaction that might cause us to 
76             // abort. However, if the <writer> has a later version than us we can safely ignore its
77             // updates. This protocol ensures a deterministic resolution to every conflict, and 
78             // avoids infinite ping-ponging between validating two conflicting transactions.
79             if (writer_state == TXN_VALIDATING && (writer_version < txn->wv || 
80                                                    writer_version == UNDETERMINED_VERSION)) {
81                 writer_state = txn_validate(writer);
82             }
83
84             if (writer_state == TXN_VALIDATED)
85                 return TXN_ABORTED;
86         }
87
88         return TXN_ABORTED;
89     }
90
91     return TXN_VALIDATED;
92 }
93
94 static txn_state_e txn_validate (txn_t *txn) {
95     int i;
96     switch (txn->state) {
97
98         case TXN_VALIDATING:
99             if (txn->wv == UNDETERMINED_VERSION) {
100                 uint64_t wv = SYNC_ADD(&version_, 1);
101                 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
102             }
103
104             for (i = 0; i < txn->writes_count; ++i) {
105                 txn_state_e s = tm_validate_key(txn, txn->writes[i].key);
106                 if (s == TXN_ABORTED) {
107                     txn->state = TXN_ABORTED;
108                     break;
109                 }
110             }
111             if (txn->state == TXN_VALIDATING) {
112                 txn->state =  TXN_VALIDATED;
113             }
114             break;
115
116         case TXN_VALIDATED:
117         case TXN_ABORTED:
118             break;
119
120         default:
121             assert(FALSE);
122     }
123
124     return txn->state;
125 }
126
127 static update_rec_t *alloc_update_rec (void) {
128     update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t));
129     memset(u, 0, sizeof(update_rec_t));
130     return u;
131 }
132
133 txn_t *txn_begin (txn_access_e access, txn_isolation_e isolation, map_t *map) {
134     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
135     memset(txn, 0, sizeof(txn_t));
136     txn->access = access;
137     txn->isolation = isolation;
138     txn->rv = version_;
139     txn->wv = UNDETERMINED_VERSION;
140     txn->state = TXN_RUNNING;
141     txn->map = map;
142     if (isolation != TXN_READ_ONLY) {
143         txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
144         txn->writes_size = INITIAL_WRITES_SIZE;
145     }
146     return txn;
147 }
148
149 void txn_abort (txn_t *txn) {
150
151     int i;
152     for (i = 0; i < txn->writes_count; ++i) {
153         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
154         update->version = TAG_VALUE(0);
155     }
156
157     nbd_defer_free(txn->writes);
158     nbd_defer_free(txn);
159 }
160
161 txn_state_e txn_commit (txn_t *txn) {
162
163     assert(txn->state == TXN_RUNNING);
164     txn->state = TXN_VALIDATING;
165     txn_state_e state = txn_validate(txn);
166
167     // Detach <txn> from its updates.
168     uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv;
169     int i;
170     for (i = 0; i < txn->writes_count; ++i) {
171         update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
172         update->version = wv;
173     }
174
175     nbd_defer_free(txn->writes);
176     nbd_defer_free(txn);
177
178     return state;
179 }
180
181 // Get most recent committed version prior to our read version.
182 uint64_t tm_get (txn_t *txn, void *key) {
183
184     // Iterate through update records associated with <key> to find the latest committed version. 
185     // We can use the first matching version. Older updates always come later in the list.
186     update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
187     for (; update != NULL; update = update->prev) {
188         uint64_t writer_version = update->version;
189         if (writer_version < txn->rv)
190             return update->value;
191
192         // If the version is tagged, it means that it is not a version number, but a pointer to an
193         // in progress transaction.
194         if (IS_TAGGED(update->version)) {
195             txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
196
197             if (writer == txn)
198                 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
199
200             // Skip updates from aborted transactions.
201             txn_state_e writer_state = writer->state;
202             if (EXPECT_FALSE(writer_state == TXN_ABORTED))
203                 continue;
204
205             if (writer_state == TXN_VALIDATING) {
206                 writer_state = txn_validate(writer);
207             }
208
209             if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION)
210                 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
211         }
212     }
213     return DOES_NOT_EXIST;
214 }
215
216 void tm_set (txn_t *txn, void *key, uint64_t value) {
217
218     // create a new update record
219     update_rec_t *update = alloc_update_rec();
220     update->type = UPDATE_TYPE_PUT;
221     update->value = value;
222     update->version = TAG_VALUE((uint64_t)txn);
223
224     // push the new update record onto <key>'s update list
225     uint64_t update_prev;
226     do {
227         update->prev = (update_rec_t *) map_get(txn->map, key);
228         update_prev = (uint64_t)update->prev;
229     } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev);
230
231     // add <key> to the write set for commit-time validation
232     if (txn->writes_count == txn->writes_size) {
233         write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
234         memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
235         txn->writes_size *= 2;
236     }
237     int i = txn->writes_count++;
238     txn->writes[i].key = key;
239     txn->writes[i].rec = update;
240 }