2 * Written by Josh Dybnis and released to the public domain, as explained at
3 * http://creativecommons.org/licenses/publicdomain
9 #define UNDETERMINED_VERSION 0
10 #define INITIAL_WRITES_SIZE 4
12 typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t;
13 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;
15 typedef struct update_rec update_rec_t;
21 update_rec_t *prev; // a previous update
24 typedef struct write_rec {
35 uint32_t writes_count;
38 txn_isolation_t isolation;
42 uint64_t GlobalVersion = 1;
43 uint64_t MinActiveTxnVersion = 0;
45 static txn_state_t txn_validate (txn_t *txn);
47 update_rec_t *alloc_update_rec (void) {
48 update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t));
49 memset(u, 0, sizeof(update_rec_t));
53 txn_t *txn_begin (txn_access_t access, txn_isolation_t isolation, hashtable_t *ht) {
54 txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
55 memset(txn, 0, sizeof(txn_t));
57 txn->isolation = isolation;
58 txn->rv = GlobalVersion;
59 txn->wv = UNDETERMINED_VERSION;
60 txn->state = TXN_RUNNING;
62 if (isolation != TXN_READ_ONLY) {
63 txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
64 txn->writes_size = INITIAL_WRITES_SIZE;
69 // Get most recent committed version prior to our read version.
70 int64_t txn_ht_get (txn_t *txn, const char *key, uint32_t key_len) {
72 // Iterate through update records associated with <key> to find the latest committed version.
73 // We can use the first matching version. Older updates always come later in the list.
74 update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len);
75 for (; update != NULL; update = update->prev) {
76 uint64_t writer_version = update->version;
77 if (writer_version < txn->rv)
80 // If the version is tagged, it means that it is not a version number, but a pointer to an
81 // in progress transaction.
82 if (IS_TAGGED(update->version)) {
83 txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
86 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
88 // Skip updates from aborted transactions.
89 txn_state_t writer_state = writer->state;
90 if (EXPECT_FALSE(writer_state == TXN_ABORTED))
93 if (writer_state == TXN_VALIDATING) {
94 writer_state = txn_validate(writer);
97 if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION)
98 return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
101 return DOES_NOT_EXIST;
104 // Validate the updates for <key>. Validation fails for a key we have written to if there is a
105 // write committed newer than our read version.
106 static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t key_len) {
108 update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len);
109 for (; update != NULL; update = update->prev) {
110 uint64_t writer_version = update->version;
111 if (writer_version <= txn->rv)
112 return TXN_VALIDATED;
114 // If the version is tagged, it means it is a pointer to a transaction in progress.
115 if (IS_TAGGED(writer_version)) {
117 // Skip aborted transactions.
118 if (EXPECT_FALSE(writer_version == TAG_VALUE(0)))
121 // Skip our own updates.
122 txn_t *writer = (txn_t *)STRIP_TAG(writer_version);
126 writer_version = writer->wv;
127 if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION)
128 return TXN_VALIDATED;
130 txn_state_t writer_state = writer->state;
131 if (EXPECT_FALSE(writer_state == TXN_ABORTED))
134 // Help validate <writer> if it is a committing transaction that might cause us to
135 // abort. However, if the <writer> has a later version than us we can safely ignore its
136 // updates. This protocol ensures a deterministic resolution to every conflict, and
137 // avoids infinite ping-ponging between validating two conflicting transactions.
138 if (writer_state == TXN_VALIDATING && (writer_version < txn->wv ||
139 writer_version == UNDETERMINED_VERSION)) {
140 writer_state = txn_validate(writer);
143 if (writer_state == TXN_VALIDATED)
150 return TXN_VALIDATED;
153 static txn_state_t txn_validate (txn_t *txn) {
155 switch (txn->state) {
158 if (txn->wv == UNDETERMINED_VERSION) {
159 uint64_t wv = SYNC_ADD(&GlobalVersion, 1);
160 SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv);
163 for (i = 0; i < txn->writes_count; ++i) {
164 txn_state_t s = txn_ht_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key));
165 if (s == TXN_ABORTED) {
166 txn->state = TXN_ABORTED;
170 if (txn->state == TXN_VALIDATING) {
171 txn->state = TXN_VALIDATED;
186 void txn_abort (txn_t *txn) {
189 for (i = 0; i < txn->writes_count; ++i) {
190 update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
191 update->version = TAG_VALUE(0);
194 nbd_defer_free(txn->writes);
198 txn_state_t txn_commit (txn_t *txn) {
200 assert(txn->state == TXN_RUNNING);
201 txn->state = TXN_VALIDATING;
202 txn_state_t state = txn_validate(txn);
204 // Detach <txn> from its updates.
205 uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv;
207 for (i = 0; i < txn->writes_count; ++i) {
208 update_rec_t *update = (update_rec_t *)txn->writes[i].rec;
209 update->version = wv;
212 nbd_defer_free(txn->writes);
218 void txn_ht_put (txn_t *txn, const char *key, uint32_t key_len, int64_t value) {
220 // create a new update record
221 update_rec_t *update = alloc_update_rec();
222 update->type = UPDATE_TYPE_PUT;
223 update->value = value;
224 update->version = TAG_VALUE((uint64_t)txn);
226 // push the new update record onto <key>'s update list
229 update->prev = (update_rec_t *) ht_get(txn->ht, key, key_len);
230 update_prev = (int64_t)update->prev;
231 } while (ht_compare_and_set(txn->ht, key, key_len, update_prev, (int64_t)update) != update_prev);
233 // add <key> to the write set for commit-time validation
234 if (txn->writes_count == txn->writes_size) {
235 write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2);
236 memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t));
237 txn->writes_size *= 2;
239 int i = txn->writes_count++;
240 txn->writes[i].key = key;
241 txn->writes[i].rec = update;