X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;fp=txn%2Ftxn.c;h=991c22c8a8351002922df123cc6413c925676a5f;hp=6e6b7973703f9b5792e6dbef772a1c6ab0895be3;hb=f7a1c10d18dcc2654d0c9b1f5ffc9f4ec9b23776;hpb=b0f5164fac83e2ad24ab1f56d5c1f022279372ab diff --git a/txn/txn.c b/txn/txn.c index 6e6b797..991c22c 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -43,10 +43,10 @@ static uint64_t version_ = 1; static txn_state_e txn_validate (txn_t *txn); -static map_t *active_ = NULL; +static skiplist_t *active_ = NULL; void txn_init (void) { - active_ = map_alloc(&sl_map_impl, NULL); + active_ = sl_alloc(NULL); } // Validate the updates for . Validation fails if there is a write-write conflict. That is if after our @@ -169,7 +169,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { uint64_t temp = 0; do { old_count = temp; - temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1); + temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1); } while (temp != old_count); if (txn->rv == version_) @@ -178,7 +178,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { temp = 1; do { old_count = temp; - temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); } while (temp != old_count); } while (1); @@ -211,18 +211,17 @@ txn_state_e txn_commit (txn_t *txn) { update->version = wv; } - /* // Lower the reference count for 's read version - uint64_t temp = 1; + uint64_t temp = 2; uint64_t old_count; do { old_count = temp; - temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + if (temp == 1 && txn->rv != version_) { + sl_remove(active_, (void *)txn->rv); + break; + } } while (old_count != temp); - if (old_count == 0 && version_ != txn->rv) { - map_remove(active_, (void *)txn->rv); - } - */ nbd_defer_free(txn->writes); nbd_defer_free(txn); @@ -241,7 +240,7 @@ uint64_t tm_get (txn_t *txn, void *key) { // If the update's version is not tagged it means the update is committed. if (!IS_TAGGED(update->version)) { if (update->version <= txn->rv) - return update->value; + break; // success continue; } @@ -255,7 +254,7 @@ uint64_t tm_get (txn_t *txn, void *key) { // The update's transaction is still in progress. Access its txn_t. txn_t *writer = (txn_t *)STRIP_TAG(update->version); if (writer == txn) // found our own update - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + break; // success txn_state_e writer_state = writer->state; if (writer_state == TXN_RUNNING) @@ -274,9 +273,30 @@ uint64_t tm_get (txn_t *txn, void *key) { assert(writer_state == TXN_VALIDATED); if (writer->wv > txn->rv) continue; - return update->value; + break; // success } - return DOES_NOT_EXIST; + + if (EXPECT_FALSE(update == NULL)) + return DOES_NOT_EXIST; + + // collect some garbage + update_rec_t *next = update->next; + if (next != NULL) { + uint64_t min_active_version = (uint64_t)sl_min_key(active_); + if (next->version < min_active_version) { + next = SYNC_SWAP(&update->next, NULL); + while (next != NULL) { + update = next; + next = NULL; + if (update->next != NULL) { + next = SYNC_SWAP(&update->next, NULL); + } + nbd_free(update); + } + } + } + + return update->value; } void tm_set (txn_t *txn, void *key, uint64_t value) {