2 * Written by Josh Dybnis and released to the public domain, as explained at
3 * http://creativecommons.org/licenses/publicdomain
9 #define UNDETERMINED_VERSION 0
10 #define INITIAL_WRITES_SIZE 4
12 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;
14 typedef struct update_rec update_rec_t;
20 update_rec_t *prev; // a previous update
23 typedef struct write_rec {
34 uint32_t writes_count;
37 txn_isolation_e isolation;
41 static txn_state_e txn_validate (txn_t *txn);
43 static uint64_t version_ = 1;
45 // Validate the updates for <key>. Validation fails for a key we have written to if there is a
46 // write committed newer than our read version.
47 static txn_state_e tm_validate_key (txn_t *txn, void *key) {
49 update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
50 for (; update != NULL; update = update->prev) {
51 uint64_t writer_version = update->version;
52 if (writer_version <= txn->rv)
55 // If the version is tagged, it means it is a pointer to a transaction in progress.
56 if (IS_TAGGED(writer_version)) {
58 // Skip aborted transactions.
59 if (EXPECT_FALSE(writer_version == TAG_VALUE(0)))
62 // Skip our own updates.
63 txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
67 writer_version = writer->wv;
68 if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION)
71 txn_state_e writer_state = writer->state;
72 if (EXPECT_FALSE(writer_state == TXN_ABORTED))
75 // Help validate <writer> if it is a committing transaction that might cause us to
76 // abort. However, if the <writer> has a later version than us we can safely ignore its
77 // updates. This protocol ensures a deterministic resolution to every conflict, and
78 // avoids infinite ping-ponging between validating two conflicting transactions.
79 if (writer_state == TXN_VALIDATING && (writer_version < txn->wv ||
80 writer_version == UNDETERMINED_VERSION)) {
81 writer_state = txn_validate(writer);
84 if (writer_state == TXN_VALIDATED)
94 static txn_state_e txn_validate (txn_t *txn) {
99 if (txn->wv == UNDETERMINED_VERSION) {
100 uint64_t wv = SYNC_ADD(&version_, 1);
101 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
104 for (i = 0; i < txn->writes_count; ++i) {
105 txn_state_e s = tm_validate_key(txn, txn->writes[i].key);
106 if (s == TXN_ABORTED) {
107 txn->state = TXN_ABORTED;
111 if (txn->state == TXN_VALIDATING) {
112 txn->state = TXN_VALIDATED;
127 static update_rec_t *alloc_update_rec (void) {
128 update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t));
129 memset(u, 0, sizeof(update_rec_t));
133 txn_t *txn_begin (txn_access_e access, txn_isolation_e isolation, map_t *map) {
134 txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
135 memset(txn, 0, sizeof(txn_t));
136 txn->access = access;
137 txn->isolation = isolation;
139 txn->wv = UNDETERMINED_VERSION;
140 txn->state = TXN_RUNNING;
142 if (isolation != TXN_READ_ONLY) {
143 txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
144 txn->writes_size = INITIAL_WRITES_SIZE;
149 void txn_abort (txn_t *txn) {
152 for (i = 0; i < txn->writes_count; ++i) {
153 update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
154 update->version = TAG_VALUE(0);
157 nbd_defer_free(txn->writes);
161 txn_state_e txn_commit (txn_t *txn) {
163 assert(txn->state == TXN_RUNNING);
164 txn->state = TXN_VALIDATING;
165 txn_state_e state = txn_validate(txn);
167 // Detach <txn> from its updates.
168 uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv;
170 for (i = 0; i < txn->writes_count; ++i) {
171 update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
172 update->version = wv;
175 nbd_defer_free(txn->writes);
181 // Get most recent committed version prior to our read version.
182 uint64_t tm_get (txn_t *txn, void *key) {
184 // Iterate through update records associated with <key> to find the latest committed version.
185 // We can use the first matching version. Older updates always come later in the list.
186 update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
187 for (; update != NULL; update = update->prev) {
188 uint64_t writer_version = update->version;
189 if (writer_version < txn->rv)
190 return update->value;
192 // If the version is tagged, it means that it is not a version number, but a pointer to an
193 // in progress transaction.
194 if (IS_TAGGED(update->version)) {
195 txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
198 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
200 // Skip updates from aborted transactions.
201 txn_state_e writer_state = writer->state;
202 if (EXPECT_FALSE(writer_state == TXN_ABORTED))
205 if (writer_state == TXN_VALIDATING) {
206 writer_state = txn_validate(writer);
209 if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION)
210 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
213 return DOES_NOT_EXIST;
216 void tm_set (txn_t *txn, void *key, uint64_t value) {
218 // create a new update record
219 update_rec_t *update = alloc_update_rec();
220 update->type = UPDATE_TYPE_PUT;
221 update->value = value;
222 update->version = TAG_VALUE((uint64_t)txn);
224 // push the new update record onto <key>'s update list
225 uint64_t update_prev;
227 update->prev = (update_rec_t *) map_get(txn->map, key);
228 update_prev = (uint64_t)update->prev;
229 } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev);
231 // add <key> to the write set for commit-time validation
232 if (txn->writes_count == txn->writes_size) {
233 write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
234 memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
235 txn->writes_size *= 2;
237 int i = txn->writes_count++;
238 txn->writes[i].key = key;
239 txn->writes[i].rec = update;