X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;h=2893d35dbcfb40ec5152775146f59224cb96034d;hp=bb304b3921fe40a86c25715059b3524e52481f01;hb=2cce67f0002cdb6dcdc2ab8ccf837e3d2b3336de;hpb=a19bce63ef088ad03004bc8e9bfde4901d978151 diff --git a/txn/txn.c b/txn/txn.c index bb304b3..2893d35 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -14,13 +14,13 @@ typedef struct update_rec update_t; struct update_rec { - update_t *next; // an earlier update uint64_t version; - uint64_t value; + map_val_t value; + map_val_t next; // an earlier update }; typedef struct write_rec { - void *key; + map_key_t key; update_t *rec; } write_rec_t; @@ -29,16 +29,16 @@ struct txn { uint64_t wv; map_t *map; write_rec_t *writes; - uint32_t writes_size; - uint32_t writes_count; - uint32_t writes_scan; + uint64_t writes_size; + uint64_t writes_count; + uint64_t writes_scan; txn_state_e state; }; -static uint64_t version_ = 1; - static txn_state_e txn_validate (txn_t *txn); +static uint64_t version_ = 1; + static skiplist_t *active_ = NULL; void txn_init (void) { @@ -51,20 +51,21 @@ void txn_init (void) { // If we encounter a potential conflict with a transaction that is in the process of validating, we help it // complete validating. It must be finished before we can decide to rollback or commit. // -static txn_state_e validate_key (txn_t *txn, void *key) { +static txn_state_e validate_key (txn_t *txn, map_key_t key) { assert(txn->state != TXN_RUNNING); - update_t *update = (update_t *) map_get(txn->map, key); - for (; update != NULL; update = update->next) { + map_val_t val = map_get(txn->map, key); + update_t *update = NULL; + for (; val != DOES_NOT_EXIST; val = update->next) { // If the update or its version is not tagged it means the update is committed. // // We can stop at the first committed record we find that is at least as old as our read version. All // the other committed records following it will be older. And all the uncommitted records following it // will eventually conflict with it and abort. - if (!IS_TAGGED(update, TAG2)) + if (!IS_TAGGED(val, TAG2)) return TXN_VALIDATED; - update = (update_t *)STRIP_TAG(update, TAG2); + update = (update_t *)(size_t)STRIP_TAG(val, TAG2); if (!IS_TAGGED(update->version, TAG1)) return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; @@ -76,7 +77,7 @@ static txn_state_e validate_key (txn_t *txn, void *key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1); if (writer == txn) continue; // Skip our own updates. txn_state_e writer_state = writer->state; @@ -168,7 +169,7 @@ txn_t *txn_begin (map_t *map) { uint64_t temp = 0; do { old_count = temp; - temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1); + temp = (uint64_t)sl_cas(active_, (map_key_t)txn->rv, old_count, old_count + 1); } while (temp != old_count); if (txn->rv == version_) @@ -177,7 +178,7 @@ txn_t *txn_begin (map_t *map) { temp = 1; do { old_count = temp; - temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1); } while (temp != old_count); } while (1); @@ -219,9 +220,9 @@ txn_state_e txn_commit (txn_t *txn) { uint64_t old_count; do { old_count = temp; - temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1); if (temp == 1 && txn->rv != version_) { - sl_remove(active_, (void *)txn->rv); + sl_remove(active_, (map_key_t)txn->rv); break; } } while (old_count != temp); @@ -233,22 +234,20 @@ txn_state_e txn_commit (txn_t *txn) { } // Get most recent committed version prior to our read version. -uint64_t txn_map_get (txn_t *txn, void *key) { +uint64_t txn_map_get (txn_t *txn, map_key_t key) { if (txn->state != TXN_RUNNING) return ERROR_TXN_NOT_RUNNING; - update_t *newest_update = (update_t *) map_get(txn->map, key); - if (!IS_TAGGED(newest_update, TAG2)) - return (uint64_t)newest_update; - // Iterate through the update records to find the latest committed version prior to our read version. - update_t *update; - for (update = newest_update; ; update = update->next) { + uint64_t newest_val = map_get(txn->map, key); + uint64_t val = newest_val; + update_t *update = NULL; + for ( ; ; val = update->next) { - if (!IS_TAGGED(update, TAG2)) - return (uint64_t)update; + if (!IS_TAGGED(val, TAG2)) + return val; - update = (update_t *)STRIP_TAG(update, TAG2); + update = (update_t *)(size_t)STRIP_TAG(val, TAG2); assert(update != NULL); // If the update's version is not tagged it means the update is committed. @@ -266,7 +265,7 @@ uint64_t txn_map_get (txn_t *txn, void *key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1); if (writer == txn) // found our own update break; // success @@ -293,73 +292,77 @@ uint64_t txn_map_get (txn_t *txn, void *key) { uint64_t value = update->value; // collect some garbage - update_t *last = update; - update_t *next = update->next; - uint64_t min_active = 0; - if (IS_TAGGED(next, TAG2)) { - next = (update_t *)STRIP_TAG(next, TAG2); - min_active = (uint64_t)sl_min_key(active_); - if (next->version < min_active) { - - // Skip over aborted versions to verify the chain of updates is old enough for collection - update_t *temp = next; + uint64_t min_active_version = UNDETERMINED_VERSION; + update_t *next_update = NULL; + if (IS_TAGGED(update->next, TAG2)) { + next_update = (update_t *)(size_t)STRIP_TAG(update->next, TAG2); + min_active_version = (uint64_t)sl_min_key(active_); + if (next_update->version < min_active_version) { + // (and all update records following it [execpt if it is aborted]) is old enough that it is + // not visible to any active transaction. We can safely free it. + + // Skip over aborted versions to look for more recent updates + update_t *temp = next_update; while (temp->version == ABORTED_VERSION) { assert(!IS_TAGGED(temp->version, TAG1)); - update_t *temp = next->next; - if (!IS_TAGGED(temp, TAG2)) + uint64_t next = next_update->next; + if (!IS_TAGGED(next, TAG2)) break; - temp = (update_t *)STRIP_TAG(temp, TAG2); - if (temp->version >= min_active) + + temp = (update_t *)(size_t)STRIP_TAG(next, TAG2); + if (temp->version >= min_active_version) return value; - temp = temp->next; } - // collect and all the update records following it - do { - next = SYNC_SWAP(&update->next, NULL); + // free and all the update records following it + temp = next_update; + while (1) { + uint64_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); // if we find ourself in a race just back off and let the other thread take care of it - if (next == NULL) + if (next == DOES_NOT_EXIST) return value; - update = next; - next = next->next; + if (!IS_TAGGED(next, TAG2)) + break; + + temp = (update_t *)(size_t)STRIP_TAG(next, TAG2); nbd_free(update); - } while (IS_TAGGED(next, TAG2)); + } } } // If there is one item left and it is visible by all active transactions we can merge it into the map itself. // There is no need for an update record. - if (next == NULL && last == (update_t *)STRIP_TAG(newest_update, TAG2)) { - if (min_active == UNDETERMINED_VERSION) { - min_active = (uint64_t)sl_min_key(active_); + if (next_update == NULL && val == newest_val) { + if (min_active_version == UNDETERMINED_VERSION) { + min_active_version = (uint64_t)sl_min_key(active_); } - if (last->version <= min_active) { - if (map_cas(txn->map, key, TAG_VALUE(last, TAG2), value) == TAG_VALUE(last, TAG2)) { - nbd_defer_free(last); + if (update->version <= min_active_version) { + if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) { + nbd_defer_free(update); } } - } + } return value; } -void txn_map_set (txn_t *txn, void *key, uint64_t value) { +void txn_map_set (txn_t *txn, map_key_t key, uint64_t value) { if (txn->state != TXN_RUNNING) return; // TODO: return some sort of error code // create a new update record update_t *update = alloc_update_rec(); update->value = value; - update->version = TAG_VALUE(txn, TAG1); + update->version = TAG_VALUE((uint64_t)(size_t)txn, TAG1); // push the new update record onto 's update list uint64_t old_update; do { old_update = map_get(txn->map, key); - update->next = (update_t *)old_update; - } while (map_cas(txn->map, key, old_update, TAG_VALUE(update, TAG2)) != old_update); + update->next = old_update; + } while (map_cas(txn->map, key, old_update, TAG_VALUE((uint64_t)(size_t)update, TAG2)) != old_update); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {