+// Get most recent committed version prior to our read version.
+map_val_t txn_map_get (txn_t *txn, map_key_t key) {
+ if (txn->state != TXN_RUNNING)
+ return ERROR_TXN_NOT_RUNNING;
+
+ // Iterate through the update records to find the latest committed version prior to our read version.
+ map_val_t newest_val = map_get(txn->map, key);
+ map_val_t val = newest_val;
+ update_t *update = NULL;
+ for ( ; ; val = update->next) {
+
+ if (!IS_TAGGED(val, TAG2))
+ return val;
+
+ update = (update_t *)STRIP_TAG(val, TAG2);
+ assert(update != NULL);
+
+ // If the update's version is not tagged it means the update is committed.
+ if (!IS_TAGGED(update->version, TAG1)) {
+ if (update->version <= txn->rv)
+ break; // success
+ continue;
+ }
+
+ // If the update's version is tagged then either the update was aborted or the the version number is
+ // actually a pointer to a running transaction's txn_t.
+
+ // Skip updates from aborted transactions.
+ if (EXPECT_FALSE(update->version == ABORTED_VERSION))
+ continue;
+
+ // The update's transaction is still in progress. Access its txn_t.
+ txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1);
+ if (writer == txn) // found our own update
+ break; // success
+
+ txn_state_e writer_state = writer->state;
+ if (writer_state == TXN_RUNNING)
+ continue;
+
+ if (writer_state == TXN_VALIDATING) {
+ if (writer->wv > txn->rv)
+ continue;
+ writer_state = txn_validate(writer);
+ }
+
+ // Skip updates from aborted transactions.
+ if (writer_state == TXN_ABORTED)
+ continue;
+
+ assert(writer_state == TXN_VALIDATED);
+ if (writer->wv > txn->rv)
+ continue;
+ break; // success
+ }
+
+ map_val_t value = update->value;
+
+ // collect some garbage
+ version_t min_active_version = UNDETERMINED_VERSION;
+ update_t *next_update = NULL;
+ if (IS_TAGGED(update->next, TAG2)) {
+ next_update = (update_t *)STRIP_TAG(update->next, TAG2);
+ min_active_version = (version_t)sl_min_key(active_);
+ if (next_update->version < min_active_version) {
+ // <next_update> (and all update records following it [execpt if it is aborted]) is old enough that it is
+ // not visible to any active transaction. We can safely free it.
+
+ // Skip over aborted versions to look for more recent updates
+ update_t *temp = next_update;
+ while (temp->version == ABORTED_VERSION) {
+ assert(!IS_TAGGED(temp->version, TAG1));
+ map_val_t next = next_update->next;
+ if (!IS_TAGGED(next, TAG2))
+ break;
+
+ temp = (update_t *)STRIP_TAG(next, TAG2);
+ if (temp->version >= min_active_version)
+ return value;
+ }
+
+ // free <next> and all the update records following it
+ temp = next_update;
+ while (1) {
+ map_val_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST);
+
+ // if we find ourself in a race just back off and let the other thread take care of it
+ if (next == DOES_NOT_EXIST)
+ return value;
+
+ if (!IS_TAGGED(next, TAG2))
+ break;
+
+ temp = (update_t *)STRIP_TAG(next, TAG2);
+ nbd_free(update);
+ }
+ }
+ }
+
+ // If there is one item left and it is visible by all active transactions we can merge it into the map itself.
+ // There is no need for an update record.
+ if (next_update == NULL && val == newest_val) {
+ if (min_active_version == UNDETERMINED_VERSION) {
+ min_active_version = (version_t)sl_min_key(active_);
+ }
+ if (update->version <= min_active_version) {
+ if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) {
+ rcu_defer_free(update);
+ }
+ }
+ }
+
+ return value;
+}
+
+void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) {
+ if (txn->state != TXN_RUNNING)
+ return; // TODO: return some sort of error code