2 * Written by Josh Dybnis and released to the public domain, as explained at
3 * http://creativecommons.org/licenses/publicdomain
10 #define UNDETERMINED_VERSION 0
11 #define INITIAL_WRITES_SIZE 4
13 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;
15 typedef struct update_rec update_rec_t;
21 update_rec_t *prev; // a previous update
24 typedef struct write_rec {
35 uint32_t writes_count;
41 static uint64_t version_ = 1;
43 static txn_state_e txn_validate (txn_t *txn);
45 static map_t *active_ = NULL;
47 void txn_init (void) {
48 active_ = map_alloc(&sl_map_impl, NULL);
51 // Validate the updates for <key>. Validation fails for a key we have written to if there is a
52 // write committed newer than our read version.
53 static txn_state_e tm_validate_key (txn_t *txn, void *key) {
55 update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
56 for (; update != NULL; update = update->prev) {
57 uint64_t writer_version = update->version;
58 if (writer_version <= txn->rv)
61 // If the version is tagged, it means it is a pointer to a transaction in progress.
62 if (IS_TAGGED(writer_version)) {
64 // Skip aborted transactions.
65 if (EXPECT_FALSE(writer_version == TAG_VALUE(0)))
68 // Skip our own updates.
69 txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
73 writer_version = writer->wv;
74 if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION)
77 txn_state_e writer_state = writer->state;
78 if (EXPECT_FALSE(writer_state == TXN_ABORTED))
81 // Help validate <writer> if it is a committing transaction that might cause us to
82 // abort. However, if the <writer> has a later version than us we can safely ignore its
83 // updates. This protocol ensures a deterministic resolution to every conflict, and
84 // avoids infinite ping-ponging between validating two conflicting transactions.
85 if (writer_state == TXN_VALIDATING && (writer_version < txn->wv ||
86 writer_version == UNDETERMINED_VERSION)) {
87 writer_state = txn_validate(writer);
90 if (writer_state == TXN_VALIDATED)
100 static txn_state_e txn_validate (txn_t *txn) {
102 switch (txn->state) {
105 if (txn->wv == UNDETERMINED_VERSION) {
106 uint64_t wv = SYNC_ADD(&version_, 1);
107 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
110 for (i = 0; i < txn->writes_count; ++i) {
111 txn_state_e s = tm_validate_key(txn, txn->writes[i].key);
112 if (s == TXN_ABORTED) {
113 txn->state = TXN_ABORTED;
117 if (txn->state == TXN_VALIDATING) {
118 txn->state = TXN_VALIDATED;
133 static update_rec_t *alloc_update_rec (void) {
134 update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t));
135 memset(u, 0, sizeof(update_rec_t));
139 txn_t *txn_begin (txn_type_e type, map_t *map) {
140 txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
141 memset(txn, 0, sizeof(txn_t));
143 txn->wv = UNDETERMINED_VERSION;
144 txn->state = TXN_RUNNING;
146 if (type != TXN_READ_ONLY) {
147 txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
148 txn->writes_size = INITIAL_WRITES_SIZE;
151 // aquire the read version for txn.
159 temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1);
160 } while (temp != old_count);
162 if (txn->rv == version_)
168 temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
169 } while (temp != old_count);
175 void txn_abort (txn_t *txn) {
178 for (i = 0; i < txn->writes_count; ++i) {
179 update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
180 update->version = TAG_VALUE(0);
183 nbd_defer_free(txn->writes);
187 txn_state_e txn_commit (txn_t *txn) {
189 assert(txn->state == TXN_RUNNING);
190 txn->state = TXN_VALIDATING;
191 txn_state_e state = txn_validate(txn);
193 // Detach <txn> from its updates.
194 uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv;
196 for (i = 0; i < txn->writes_count; ++i) {
197 update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
198 update->version = wv;
201 nbd_defer_free(txn->writes);
207 // Get most recent committed version prior to our read version.
208 uint64_t tm_get (txn_t *txn, void *key) {
210 // Iterate through update records associated with <key> to find the latest committed version.
211 // We can use the first matching version. Older updates always come later in the list.
212 update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
213 for (; update != NULL; update = update->prev) {
214 uint64_t writer_version = update->version;
215 if (writer_version < txn->rv)
216 return update->value;
218 // If the version is tagged, it means that it is not a version number, but a pointer to an
219 // in progress transaction.
220 if (IS_TAGGED(update->version)) {
221 txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
224 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
226 // Skip updates from aborted transactions.
227 txn_state_e writer_state = writer->state;
228 if (EXPECT_FALSE(writer_state == TXN_ABORTED))
231 if (writer_state == TXN_VALIDATING) {
232 writer_state = txn_validate(writer);
235 if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION)
236 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
239 return DOES_NOT_EXIST;
242 void tm_set (txn_t *txn, void *key, uint64_t value) {
244 // create a new update record
245 update_rec_t *update = alloc_update_rec();
246 update->type = UPDATE_TYPE_PUT;
247 update->value = value;
248 update->version = TAG_VALUE((uint64_t)txn);
250 // push the new update record onto <key>'s update list
251 uint64_t update_prev;
253 update->prev = (update_rec_t *) map_get(txn->map, key);
254 update_prev = (uint64_t)update->prev;
255 } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev);
257 // add <key> to the write set for commit-time validation
258 if (txn->writes_count == txn->writes_size) {
259 write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
260 memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
261 txn->writes_size *= 2;
263 int i = txn->writes_count++;
264 txn->writes[i].key = key;
265 txn->writes[i].rec = update;