From f7a1c10d18dcc2654d0c9b1f5ffc9f4ec9b23776 Mon Sep 17 00:00:00 2001 From: jdybnis Date: Sun, 7 Dec 2008 10:31:08 +0000 Subject: [PATCH] in txn, clean up old update records when they can't be referenced anymore --- include/skiplist.h | 3 ++- map/skiplist.c | 11 ++++++++++ txn/txn.c | 50 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/include/skiplist.h b/include/skiplist.h index ca6b514..08d32cf 100644 --- a/include/skiplist.h +++ b/include/skiplist.h @@ -5,13 +5,14 @@ typedef struct sl skiplist_t; -skiplist_t *sl_alloc (const datatype_t *key_type); +skiplist_t * sl_alloc (const datatype_t *key_type); uint64_t sl_cas (skiplist_t *sl, void *key, uint64_t expected_val, uint64_t new_val); uint64_t sl_lookup (skiplist_t *sl, void *key); uint64_t sl_remove (skiplist_t *sl, void *key); uint64_t sl_count (skiplist_t *sl); void sl_print (skiplist_t *sl); void sl_free (skiplist_t *sl); +void * sl_min_key(skiplist_t *sl); static const map_impl_t sl_map_impl = { (map_alloc_t)sl_alloc, (map_cas_t)sl_cas, (map_get_t)sl_lookup, (map_remove_t)sl_remove, diff --git a/map/skiplist.c b/map/skiplist.c index 52d7f1a..49ef21b 100644 --- a/map/skiplist.c +++ b/map/skiplist.c @@ -231,6 +231,17 @@ uint64_t sl_lookup (skiplist_t *sl, void *key) { return DOES_NOT_EXIST; } +void *sl_min_key (skiplist_t *sl) { + node_t *item = sl->head->next[0]; + while (item != NULL) { + node_t *next = item->next[0]; + if (!IS_TAGGED(next)) + return item->key; + item = (node_t *)STRIP_TAG(next); + } + return DOES_NOT_EXIST; +} + uint64_t sl_cas (skiplist_t *sl, void *key, uint64_t expectation, uint64_t new_val) { TRACE("s1", "sl_cas: key %p skiplist %p", key, sl); TRACE("s1", "sl_cas: expectation %p new value %p", expectation, new_val); diff --git a/txn/txn.c b/txn/txn.c index 6e6b797..991c22c 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -43,10 +43,10 @@ static uint64_t version_ = 1; static txn_state_e txn_validate (txn_t *txn); -static map_t *active_ = NULL; +static skiplist_t *active_ = NULL; void txn_init (void) { - active_ = map_alloc(&sl_map_impl, NULL); + active_ = sl_alloc(NULL); } // Validate the updates for . Validation fails if there is a write-write conflict. That is if after our @@ -169,7 +169,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { uint64_t temp = 0; do { old_count = temp; - temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1); + temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1); } while (temp != old_count); if (txn->rv == version_) @@ -178,7 +178,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { temp = 1; do { old_count = temp; - temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); } while (temp != old_count); } while (1); @@ -211,18 +211,17 @@ txn_state_e txn_commit (txn_t *txn) { update->version = wv; } - /* // Lower the reference count for 's read version - uint64_t temp = 1; + uint64_t temp = 2; uint64_t old_count; do { old_count = temp; - temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + if (temp == 1 && txn->rv != version_) { + sl_remove(active_, (void *)txn->rv); + break; + } } while (old_count != temp); - if (old_count == 0 && version_ != txn->rv) { - map_remove(active_, (void *)txn->rv); - } - */ nbd_defer_free(txn->writes); nbd_defer_free(txn); @@ -241,7 +240,7 @@ uint64_t tm_get (txn_t *txn, void *key) { // If the update's version is not tagged it means the update is committed. if (!IS_TAGGED(update->version)) { if (update->version <= txn->rv) - return update->value; + break; // success continue; } @@ -255,7 +254,7 @@ uint64_t tm_get (txn_t *txn, void *key) { // The update's transaction is still in progress. Access its txn_t. txn_t *writer = (txn_t *)STRIP_TAG(update->version); if (writer == txn) // found our own update - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + break; // success txn_state_e writer_state = writer->state; if (writer_state == TXN_RUNNING) @@ -274,9 +273,30 @@ uint64_t tm_get (txn_t *txn, void *key) { assert(writer_state == TXN_VALIDATED); if (writer->wv > txn->rv) continue; - return update->value; + break; // success } - return DOES_NOT_EXIST; + + if (EXPECT_FALSE(update == NULL)) + return DOES_NOT_EXIST; + + // collect some garbage + update_rec_t *next = update->next; + if (next != NULL) { + uint64_t min_active_version = (uint64_t)sl_min_key(active_); + if (next->version < min_active_version) { + next = SYNC_SWAP(&update->next, NULL); + while (next != NULL) { + update = next; + next = NULL; + if (update->next != NULL) { + next = SYNC_SWAP(&update->next, NULL); + } + nbd_free(update); + } + } + } + + return update->value; } void tm_set (txn_t *txn, void *key, uint64_t value) { -- 2.40.0