static txn_state_e txn_validate (txn_t *txn);
-static map_t *active_ = NULL;
+static skiplist_t *active_ = NULL;
void txn_init (void) {
- active_ = map_alloc(&sl_map_impl, NULL);
+ active_ = sl_alloc(NULL);
}
// Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our
uint64_t temp = 0;
do {
old_count = temp;
- temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1);
+ temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1);
} while (temp != old_count);
if (txn->rv == version_)
temp = 1;
do {
old_count = temp;
- temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+ temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
} while (temp != old_count);
} while (1);
update->version = wv;
}
- /*
// Lower the reference count for <txn>'s read version
- uint64_t temp = 1;
+ uint64_t temp = 2;
uint64_t old_count;
do {
old_count = temp;
- temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+ temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+ if (temp == 1 && txn->rv != version_) {
+ sl_remove(active_, (void *)txn->rv);
+ break;
+ }
} while (old_count != temp);
- if (old_count == 0 && version_ != txn->rv) {
- map_remove(active_, (void *)txn->rv);
- }
- */
nbd_defer_free(txn->writes);
nbd_defer_free(txn);
// If the update's version is not tagged it means the update is committed.
if (!IS_TAGGED(update->version)) {
if (update->version <= txn->rv)
- return update->value;
+ break; // success
continue;
}
// The update's transaction is still in progress. Access its txn_t.
txn_t *writer = (txn_t *)STRIP_TAG(update->version);
if (writer == txn) // found our own update
- return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
+ break; // success
txn_state_e writer_state = writer->state;
if (writer_state == TXN_RUNNING)
assert(writer_state == TXN_VALIDATED);
if (writer->wv > txn->rv)
continue;
- return update->value;
+ break; // success
}
- return DOES_NOT_EXIST;
+
+ if (EXPECT_FALSE(update == NULL))
+ return DOES_NOT_EXIST;
+
+ // collect some garbage
+ update_rec_t *next = update->next;
+ if (next != NULL) {
+ uint64_t min_active_version = (uint64_t)sl_min_key(active_);
+ if (next->version < min_active_version) {
+ next = SYNC_SWAP(&update->next, NULL);
+ while (next != NULL) {
+ update = next;
+ next = NULL;
+ if (update->next != NULL) {
+ next = SYNC_SWAP(&update->next, NULL);
+ }
+ nbd_free(update);
+ }
+ }
+ }
+
+ return update->value;
}
void tm_set (txn_t *txn, void *key, uint64_t value) {