X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;fp=txn%2Ftxn.c;h=ea3d6df4ccbb5729fc0447218979b0bafbee703e;hp=991c22c8a8351002922df123cc6413c925676a5f;hb=dbcd4739e02b8e774e28b752c412d7e2f242cd47;hpb=f7a1c10d18dcc2654d0c9b1f5ffc9f4ec9b23776 diff --git a/txn/txn.c b/txn/txn.c index 991c22c..ea3d6df 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -8,23 +8,20 @@ #include "skiplist.h" #define UNDETERMINED_VERSION 0 -#define ABORTED_VERSION TAG_VALUE(0) +#define ABORTED_VERSION TAG_VALUE(0, TAG1) #define INITIAL_WRITES_SIZE 4 -typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t; - -typedef struct update_rec update_rec_t; +typedef struct update_rec update_t; struct update_rec { - update_type_t type; - uint64_t value; + update_t *next; // an earlier update uint64_t version; - update_rec_t *next; // an earlier update + uint64_t value; }; typedef struct write_rec { void *key; - update_rec_t *rec; + update_t *rec; } write_rec_t; struct txn { @@ -57,15 +54,18 @@ void txn_init (void) { // static txn_state_e tm_validate_key (txn_t *txn, void *key) { - update_rec_t *update = (update_rec_t *) map_get(txn->map, key); + update_t *update = (update_t *) map_get(txn->map, key); for (; update != NULL; update = update->next) { - // If the update's version is not tagged it means the update is committed. + // If the update or its version is not tagged it means the update is committed. // // We can stop at the first committed record we find that is at least as old as our read version. All // the other committed records following it will be older. And all the uncommitted records following it // will eventually conflict with it and abort. - if (!IS_TAGGED(update->version)) + if (!IS_TAGGED(update, TAG2)) + return TXN_VALIDATED; + update = (update_t *)STRIP_TAG(update, TAG2); + if (!IS_TAGGED(update->version, TAG1)) return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; // If the update's version is tagged then either the update was aborted or the the version number is @@ -76,12 +76,12 @@ static txn_state_e tm_validate_key (txn_t *txn, void *key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version); + txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); if (writer == txn) continue; // Skip our own updates. txn_state_e writer_state = writer->state; - // Any running transaction will only be able to aquire a wv greater than ours. A transaction changes its + // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is // greater than ours. See next comment below for why. if (writer_state == TXN_RUNNING) @@ -143,9 +143,9 @@ static txn_state_e txn_validate (txn_t *txn) { return txn->state; } -static update_rec_t *alloc_update_rec (void) { - update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t)); - memset(u, 0, sizeof(update_rec_t)); +static update_t *alloc_update_rec (void) { + update_t *u = (update_t *)nbd_malloc(sizeof(update_t)); + memset(u, 0, sizeof(update_t)); return u; } @@ -161,7 +161,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { txn->writes_size = INITIAL_WRITES_SIZE; } - // aquire the read version for txn. must be careful to avoid a race + // acquire the read version for txn. must be careful to avoid a race do { txn->rv = version_; @@ -189,7 +189,7 @@ void txn_abort (txn_t *txn) { int i; for (i = 0; i < txn->writes_count; ++i) { - update_rec_t *update = (update_rec_t *)txn->writes[i].rec; + update_t *update = (update_t *)txn->writes[i].rec; update->version = ABORTED_VERSION; } @@ -207,7 +207,7 @@ txn_state_e txn_commit (txn_t *txn) { uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; int i; for (i = 0; i < txn->writes_count; ++i) { - update_rec_t *update = (update_rec_t *)txn->writes[i].rec; + update_t *update = (update_t *)txn->writes[i].rec; update->version = wv; } @@ -232,13 +232,22 @@ txn_state_e txn_commit (txn_t *txn) { // Get most recent committed version prior to our read version. uint64_t tm_get (txn_t *txn, void *key) { - // Iterate through update records associated with to find the latest committed version prior to our - // read version. - update_rec_t *update = (update_rec_t *) map_get(txn->map, key); - for (; update != NULL; update = update->next) { + update_t *newest_update = (update_t *) map_get(txn->map, key); + if (!IS_TAGGED(newest_update, TAG2)) + return (uint64_t)newest_update; + + // Iterate through the update records to find the latest committed version prior to our read version. + update_t *update; + for (update = newest_update; ; update = update->next) { + + if (!IS_TAGGED(update, TAG2)) + return (uint64_t)update; + + update = (update_t *)STRIP_TAG(update, TAG2); + assert(update != NULL); // If the update's version is not tagged it means the update is committed. - if (!IS_TAGGED(update->version)) { + if (!IS_TAGGED(update->version, TAG1)) { if (update->version <= txn->rv) break; // success continue; @@ -252,7 +261,7 @@ uint64_t tm_get (txn_t *txn, void *key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version); + txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); if (writer == txn) // found our own update break; // success @@ -276,49 +285,82 @@ uint64_t tm_get (txn_t *txn, void *key) { break; // success } - if (EXPECT_FALSE(update == NULL)) - return DOES_NOT_EXIST; + uint64_t value = update->value; // collect some garbage - update_rec_t *next = update->next; - if (next != NULL) { - uint64_t min_active_version = (uint64_t)sl_min_key(active_); - if (next->version < min_active_version) { - next = SYNC_SWAP(&update->next, NULL); - while (next != NULL) { + update_t *last = update; + update_t *next = update->next; + uint64_t min_active = 0; + if (IS_TAGGED(next, TAG2)) { + next = (update_t *)STRIP_TAG(next, TAG2); + min_active = (uint64_t)sl_min_key(active_); + if (next->version < min_active) { + + // Skip over aborted versions to verify the chain of updates is old enough for collection + update_t *temp = next; + while (temp->version == ABORTED_VERSION) { + assert(!IS_TAGGED(temp->version, TAG1)); + update_t *temp = next->next; + if (!IS_TAGGED(temp, TAG2)) + break; + temp = (update_t *)STRIP_TAG(temp, TAG2); + if (temp->version >= min_active) + return value; + temp = temp->next; + } + + // collect and all the update records following it + do { + next = SYNC_SWAP(&update->next, NULL); + + // if we find ourself in a race just back off and let the other thread take care of it + if (next == NULL) + return value; + update = next; - next = NULL; - if (update->next != NULL) { - next = SYNC_SWAP(&update->next, NULL); - } + next = next->next; nbd_free(update); - } + } while (IS_TAGGED(next, TAG2)); } } + + // If there is one item left and it is visible by all active transactions we can merge it into the map itself. + // There is no need for an update record. + if (next == NULL && last == (update_t *)STRIP_TAG(newest_update, TAG2)) { + if (min_active == UNDETERMINED_VERSION) { + min_active = (uint64_t)sl_min_key(active_); + } + if (last->version <= min_active) { + if (map_cas(txn->map, key, TAG_VALUE(last, TAG2), value) == TAG_VALUE(last, TAG2)) { + nbd_defer_free(last); + } + } + } - return update->value; + return value; } void tm_set (txn_t *txn, void *key, uint64_t value) { // create a new update record - update_rec_t *update = alloc_update_rec(); - update->type = UPDATE_TYPE_PUT; + update_t *update = alloc_update_rec(); update->value = value; - update->version = TAG_VALUE((uint64_t)txn); + update->version = TAG_VALUE(txn, TAG1); // push the new update record onto 's update list - uint64_t update_prev; + uint64_t old_update; do { - update->next = (update_rec_t *) map_get(txn->map, key); - update_prev = (uint64_t)update->next; - } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev); + old_update = map_get(txn->map, key); + update->next = (update_t *)old_update; + } while (map_cas(txn->map, key, old_update, TAG_VALUE(update, TAG2)) != old_update); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) { write_rec_t *w = nbd_malloc(sizeof(write_rec_t) * txn->writes_size * 2); memcpy(w, txn->writes, txn->writes_size * sizeof(write_rec_t)); txn->writes_size *= 2; + nbd_free(txn->writes); + txn->writes = w; } int i = txn->writes_count++; txn->writes[i].key = key;