+// Get most recent committed version prior to our read version.
+uint64_t tm_get (txn_t *txn, void *key) {
+ if (txn->state != TXN_RUNNING)
+ return ERROR_TXN_NOT_RUNNING;
+
+ update_t *newest_update = (update_t *) map_get(txn->map, key);
+ if (!IS_TAGGED(newest_update, TAG2))
+ return (uint64_t)newest_update;
+
+ // Iterate through the update records to find the latest committed version prior to our read version.
+ update_t *update;
+ for (update = newest_update; ; update = update->next) {
+
+ if (!IS_TAGGED(update, TAG2))
+ return (uint64_t)update;
+
+ update = (update_t *)STRIP_TAG(update, TAG2);
+ assert(update != NULL);
+
+ // If the update's version is not tagged it means the update is committed.
+ if (!IS_TAGGED(update->version, TAG1)) {
+ if (update->version <= txn->rv)
+ break; // success
+ continue;
+ }
+
+ // If the update's version is tagged then either the update was aborted or the the version number is
+ // actually a pointer to a running transaction's txn_t.
+
+ // Skip updates from aborted transactions.
+ if (EXPECT_FALSE(update->version == ABORTED_VERSION))
+ continue;
+
+ // The update's transaction is still in progress. Access its txn_t.
+ txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
+ if (writer == txn) // found our own update
+ break; // success
+
+ txn_state_e writer_state = writer->state;
+ if (writer_state == TXN_RUNNING)
+ continue;
+
+ if (writer_state == TXN_VALIDATING) {
+ if (writer->wv > txn->rv)
+ continue;
+ writer_state = txn_validate(writer);
+ }
+
+ // Skip updates from aborted transactions.
+ if (writer_state == TXN_ABORTED)
+ continue;
+
+ assert(writer_state == TXN_VALIDATED);
+ if (writer->wv > txn->rv)
+ continue;
+ break; // success
+ }
+
+ uint64_t value = update->value;
+
+ // collect some garbage
+ update_t *last = update;
+ update_t *next = update->next;
+ uint64_t min_active = 0;
+ if (IS_TAGGED(next, TAG2)) {
+ next = (update_t *)STRIP_TAG(next, TAG2);
+ min_active = (uint64_t)sl_min_key(active_);
+ if (next->version < min_active) {
+
+ // Skip over aborted versions to verify the chain of updates is old enough for collection
+ update_t *temp = next;
+ while (temp->version == ABORTED_VERSION) {
+ assert(!IS_TAGGED(temp->version, TAG1));
+ update_t *temp = next->next;
+ if (!IS_TAGGED(temp, TAG2))
+ break;
+ temp = (update_t *)STRIP_TAG(temp, TAG2);
+ if (temp->version >= min_active)
+ return value;
+ temp = temp->next;
+ }
+
+ // collect <next> and all the update records following it
+ do {
+ next = SYNC_SWAP(&update->next, NULL);
+
+ // if we find ourself in a race just back off and let the other thread take care of it
+ if (next == NULL)
+ return value;
+
+ update = next;
+ next = next->next;
+ nbd_free(update);
+ } while (IS_TAGGED(next, TAG2));
+ }
+ }
+
+ // If there is one item left and it is visible by all active transactions we can merge it into the map itself.
+ // There is no need for an update record.
+ if (next == NULL && last == (update_t *)STRIP_TAG(newest_update, TAG2)) {
+ if (min_active == UNDETERMINED_VERSION) {
+ min_active = (uint64_t)sl_min_key(active_);
+ }
+ if (last->version <= min_active) {
+ if (map_cas(txn->map, key, TAG_VALUE(last, TAG2), value) == TAG_VALUE(last, TAG2)) {
+ nbd_defer_free(last);
+ }
+ }
+ }
+
+ return value;
+}
+
+void tm_set (txn_t *txn, void *key, uint64_t value) {
+ if (txn->state != TXN_RUNNING)
+ return; // TODO: return some sort of error code