typedef struct sl skiplist_t;
-skiplist_t *sl_alloc (const datatype_t *key_type);
+skiplist_t * sl_alloc (const datatype_t *key_type);
uint64_t sl_cas (skiplist_t *sl, void *key, uint64_t expected_val, uint64_t new_val);
uint64_t sl_lookup (skiplist_t *sl, void *key);
uint64_t sl_remove (skiplist_t *sl, void *key);
uint64_t sl_count (skiplist_t *sl);
void sl_print (skiplist_t *sl);
void sl_free (skiplist_t *sl);
+void * sl_min_key(skiplist_t *sl);
static const map_impl_t sl_map_impl = {
(map_alloc_t)sl_alloc, (map_cas_t)sl_cas, (map_get_t)sl_lookup, (map_remove_t)sl_remove,
return DOES_NOT_EXIST;
}
+void *sl_min_key (skiplist_t *sl) {
+ node_t *item = sl->head->next[0];
+ while (item != NULL) {
+ node_t *next = item->next[0];
+ if (!IS_TAGGED(next))
+ return item->key;
+ item = (node_t *)STRIP_TAG(next);
+ }
+ return DOES_NOT_EXIST;
+}
+
uint64_t sl_cas (skiplist_t *sl, void *key, uint64_t expectation, uint64_t new_val) {
TRACE("s1", "sl_cas: key %p skiplist %p", key, sl);
TRACE("s1", "sl_cas: expectation %p new value %p", expectation, new_val);
static txn_state_e txn_validate (txn_t *txn);
-static map_t *active_ = NULL;
+static skiplist_t *active_ = NULL;
void txn_init (void) {
- active_ = map_alloc(&sl_map_impl, NULL);
+ active_ = sl_alloc(NULL);
}
// Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our
uint64_t temp = 0;
do {
old_count = temp;
- temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1);
+ temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1);
} while (temp != old_count);
if (txn->rv == version_)
temp = 1;
do {
old_count = temp;
- temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+ temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
} while (temp != old_count);
} while (1);
update->version = wv;
}
- /*
// Lower the reference count for <txn>'s read version
- uint64_t temp = 1;
+ uint64_t temp = 2;
uint64_t old_count;
do {
old_count = temp;
- temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+ temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+ if (temp == 1 && txn->rv != version_) {
+ sl_remove(active_, (void *)txn->rv);
+ break;
+ }
} while (old_count != temp);
- if (old_count == 0 && version_ != txn->rv) {
- map_remove(active_, (void *)txn->rv);
- }
- */
nbd_defer_free(txn->writes);
nbd_defer_free(txn);
// If the update's version is not tagged it means the update is committed.
if (!IS_TAGGED(update->version)) {
if (update->version <= txn->rv)
- return update->value;
+ break; // success
continue;
}
// The update's transaction is still in progress. Access its txn_t.
txn_t *writer = (txn_t *)STRIP_TAG(update->version);
if (writer == txn) // found our own update
- return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
+ break; // success
txn_state_e writer_state = writer->state;
if (writer_state == TXN_RUNNING)
assert(writer_state == TXN_VALIDATED);
if (writer->wv > txn->rv)
continue;
- return update->value;
+ break; // success
}
- return DOES_NOT_EXIST;
+
+ if (EXPECT_FALSE(update == NULL))
+ return DOES_NOT_EXIST;
+
+ // collect some garbage
+ update_rec_t *next = update->next;
+ if (next != NULL) {
+ uint64_t min_active_version = (uint64_t)sl_min_key(active_);
+ if (next->version < min_active_version) {
+ next = SYNC_SWAP(&update->next, NULL);
+ while (next != NULL) {
+ update = next;
+ next = NULL;
+ if (update->next != NULL) {
+ next = SYNC_SWAP(&update->next, NULL);
+ }
+ nbd_free(update);
+ }
+ }
+ }
+
+ return update->value;
}
void tm_set (txn_t *txn, void *key, uint64_t value) {