+// Get most recent committed version prior to our read version.
+map_val_t txn_map_get (txn_t *txn, map_key_t key) {
+ TRACE("x1", "txn_map_get: txn %p map %p", txn, txn->map);
+ TRACE("x1", "txn_map_get: key %p", key, 0);
+
+ if (txn->state != TXN_RUNNING) {
+ TRACE("x1", "txn_map_get: error txn not running (state %p)", txn->state, 0);
+ return ERROR_TXN_NOT_RUNNING;
+ }
+
+ // Iterate through the update records to find the latest committed version prior to our read version.
+ map_val_t newest_val = map_get(txn->map, key);
+ map_val_t val = newest_val;
+ update_t *update;
+ for ( ; (update = VAL_TO_PTR(val)) != NULL ; val = update->next) {
+
+ // If TAG2 is set in <val> it indicates that <val> is an update record. Otherwise all the following are
+ // true: <val> is a literal value, it is older than any currently active transaction, and it is the most
+ // recently set value for its key. Therefore it is visible to <txn>.
+ if (!IS_TAGGED(val, TAG2)) {
+ TRACE("x1", "txn_map_get: found untagged value; returning %p", val, 0);
+ return val;
+ }
+
+ // If the update's version is not tagged it means the update is committed.
+ if (!IS_TAGGED(update->version, TAG1)) {
+ if (update->version <= txn->rv) {
+ TRACE("x2", "txn_map_get: found committed update %p (version %p)", update, update->version);
+ break; // success
+ }
+ TRACE("x2", "txn_map_get: skipping update %p (version %p)", update, update->version);
+ continue;
+ }
+
+ // If the update's version is tagged then either the update was aborted or the the version number is
+ // actually a pointer to a running transaction's txn_t.
+
+ // Skip updates from aborted transactions.
+ if (EXPECT_FALSE(update->version == ABORTED_VERSION)) {
+ TRACE("x2", "txn_map_get: skipping aborted update %p", update, 0);
+ continue;
+ }
+
+ // The update's transaction is still in progress. Access its txn_t.
+ txn_t *writer = (txn_t *)VAL_TO_PTR(update->version);
+ if (writer == txn) {
+ TRACE("x2", "txn_map_get: found txn's own update %p", update, 0);
+ break; // success
+ }
+
+ txn_state_e writer_state = writer->state;
+ if (writer_state == TXN_RUNNING) {
+ TRACE("x2", "txn_map_get: skipping update %p of in-progress transaction %p", update, writer);
+ continue;
+ }
+
+ if (writer_state == TXN_VALIDATING) {
+ TRACE("x2", "txn_map_get: update %p transaction %p validating", update, writer);
+ if (writer->wv > txn->rv)
+ continue;
+ writer_state = txn_validate(writer);
+ }
+
+ // Skip updates from aborted transactions.
+ if (writer_state == TXN_ABORTED) {
+ TRACE("x2", "txn_map_get: skipping aborted update %p", update, 0);
+ continue;
+ }
+
+ assert(writer_state == TXN_VALIDATED);
+ if (writer->wv > txn->rv) {
+ TRACE("x2", "txn_map_get: skipping update %p (version %p)", update, update->version);
+ continue;
+ }
+ break; // success
+ }
+
+ if (update == NULL) {
+ TRACE("x1", "txn_map_get: key does not exist in map", key, 0);
+ return DOES_NOT_EXIST;
+ }
+
+ map_val_t value = update->value;
+ TRACE("x1", "txn_map_get: key found returning value %p", value, 0);
+
+ return value;
+ // collect some garbage
+ version_t min_active_version = UNDETERMINED_VERSION;
+ update_t *next_update = NULL;
+ if (IS_TAGGED(update->next, TAG2)) {
+ next_update = VAL_TO_PTR(update->next);
+
+ // If <next_update> (and all update records following it [execpt if it is aborted]) is old enough
+ // that it is not visible to any active transaction we can safely free it.
+ min_active_version = (version_t)sl_min_key(active_);
+ if (next_update->version < min_active_version) {
+
+ // If the <next_update> is aborted, skip over it to look for more recent ones that may follow
+ update_t *temp = next_update;
+ while (temp->version == ABORTED_VERSION) {
+ assert(!IS_TAGGED(temp->version, TAG1));
+ map_val_t next = temp->next;
+ if (!IS_TAGGED(next, TAG2))
+ break;
+
+ // Bail out of garbage collection if we find a record that might still be accessed by an
+ // ongoing transaction.
+ if (VAL_TO_PTR(next)->version >= min_active_version)
+ return value;
+
+ temp = VAL_TO_PTR(next);
+ }
+
+ // free the next update record and all the ones following it
+ temp = next_update;
+ map_val_t next;
+ do {
+ next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST);
+
+ // if we find ourself in a race just back off and let the other thread take care of it
+ if (next == DOES_NOT_EXIST)
+ return value;
+
+ nbd_free(temp);
+
+ temp = VAL_TO_PTR(next);
+
+ } while (IS_TAGGED(next, TAG2));
+ }
+ }
+
+ // If there is one item left and it is visible by all active transactions we can merge it into the map itself.
+ // There is no need for an update record.
+ if (next_update == NULL && val == newest_val) {
+ if (min_active_version == UNDETERMINED_VERSION) {
+ min_active_version = (version_t)sl_min_key(active_);
+ }
+ if (update->version <= min_active_version) {
+ if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) {
+ rcu_defer_free(update);
+ }
+ }
+ }
+
+ return value;
+}
+
+void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) {
+ TRACE("x1", "txn_map_set: txn %p map %p", txn, txn->map);
+ TRACE("x1", "txn_map_set: key %p value %p", key, value);
+ assert(!IS_TAGGED(value, TAG1) && !IS_TAGGED(value, TAG2));
+
+ if (txn->state != TXN_RUNNING) {
+ TRACE("x1", "txn_map_set: error txn not running (state %p)", txn->state, 0);
+ return;
+ }