X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;fp=txn%2Ftxn.c;h=a932f66a3d9f79b05121a3da9f776c55397c4ea4;hp=1cc8e2f84fdeff975d719bfada0608bed62c42bb;hb=a1d0b3ca99552878b1becf561d8f3291992aaa67;hpb=ef7c7fd495750e0d26762df9f1a297500553874f diff --git a/txn/txn.c b/txn/txn.c index 1cc8e2f..a932f66 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -1,4 +1,4 @@ -/* +/* * Written by Josh Dybnis and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ @@ -6,24 +6,27 @@ #include "txn.h" #include "mem.h" #include "rcu.h" +#include "lwt.h" #include "skiplist.h" #define UNDETERMINED_VERSION 0 #define ABORTED_VERSION TAG_VALUE(0, TAG1) #define INITIAL_WRITES_SIZE 4 +#define PTR_TO_VAL(x) ((size_t)(x) >> 2) +#define VAL_TO_PTR(x) ((update_t *)((x) << 2)) typedef struct update_rec update_t; typedef map_key_t version_t; struct update_rec { - version_t version; + version_t version; // tagged versions are txn_t pointers, untagged are actual version numbers map_val_t value; map_val_t next; // an earlier update }; typedef struct write_rec { map_key_t key; - update_t *rec; + update_t *rec; } write_rec_t; struct txn { @@ -43,35 +46,35 @@ static version_t version_ = 1; static skiplist_t *active_ = NULL; -void txn_init (void) { +__attribute__ ((constructor)) void txn_init (void) { active_ = sl_alloc(NULL); } -// Validate the updates for . Validation fails if there is a write-write conflict. That is if after our +// Validate the updates for . Validation fails if there is a write-write conflict. That is if after our // read version another transaction committed a change to an entry we are also trying to change. // -// If we encounter a potential conflict with a transaction that is in the process of validating, we help it +// If we encounter a potential conflict with a transaction that is in the process of validating, we help it // complete validating. It must be finished before we can decide to rollback or commit. // static txn_state_e validate_key (txn_t *txn, map_key_t key) { assert(txn->state != TXN_RUNNING); - + map_val_t val = map_get(txn->map, key); update_t *update = NULL; for (; val != DOES_NOT_EXIST; val = update->next) { // If the update or its version is not tagged it means the update is committed. // - // We can stop at the first committed record we find that is at least as old as our read version. All - // the other committed records following it will be older. And all the uncommitted records following it + // We can stop at the first committed record we find that is at least as old as our read version. All + // the other committed records following it will be older. And all the uncommitted records following it // will eventually conflict with it and abort. if (!IS_TAGGED(val, TAG2)) return TXN_VALIDATED; - update = (update_t *)STRIP_TAG(val, TAG2); - if (!IS_TAGGED(update->version, TAG1)) + update = VAL_TO_PTR(val); + if (!IS_TAGGED(update->version, TAG1)) return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; - // If the update's version is tagged then either the update was aborted or the the version number is + // If the update's version is tagged then either the update was aborted or the the version number is // actually a pointer to a running transaction's txn_t. // Skip aborted transactions. @@ -79,20 +82,20 @@ static txn_state_e validate_key (txn_t *txn, map_key_t key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)VAL_TO_PTR(update->version); if (writer == txn) continue; // Skip our own updates. txn_state_e writer_state = writer->state; - // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its + // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is - // greater than ours. See next comment below for why. + // greater than ours. See the next comment below for the explination why. if (writer_state == TXN_RUNNING) - continue; - + continue; + // If has a later version than us we can safely ignore its updates. It will not commit until - // we have completed validation (in order to remain non-blocking it will help us validate if necessary). - // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging + // we have completed validation (in order to remain non-blocking it will help us validate if necessary). + // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging // between validating two conflicting transactions. if (writer_state == TXN_VALIDATING) { if (writer->wv > txn->wv) @@ -148,13 +151,16 @@ static txn_state_e txn_validate (txn_t *txn) { return txn->state; } -static update_t *alloc_update_rec (void) { +static update_t *alloc_update_rec (version_t ver, map_val_t val) { update_t *u = (update_t *)nbd_malloc(sizeof(update_t)); - memset(u, 0, sizeof(update_t)); + u->version = ver; + u->value = val; + u->next = DOES_NOT_EXIST; return u; } txn_t *txn_begin (map_t *map) { + TRACE("x1", "txn_begin: map %p", map, 0); txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); memset(txn, 0, sizeof(txn_t)); txn->wv = UNDETERMINED_VERSION; @@ -184,12 +190,13 @@ txn_t *txn_begin (map_t *map) { } while (temp != old_count); } while (1); + TRACE("x1", "txn_begin: returning new transaction %p (read version %p)", txn, txn->rv); return txn; } void txn_abort (txn_t *txn) { if (txn->state != TXN_RUNNING) - return; // TODO: return some sort of error code + return; int i; for (i = 0; i < txn->writes_count; ++i) { @@ -203,7 +210,7 @@ void txn_abort (txn_t *txn) { txn_state_e txn_commit (txn_t *txn) { if (txn->state != TXN_RUNNING) - return txn->state; // TODO: return some sort of error code + return txn->state; assert(txn->state == TXN_RUNNING); txn->state = TXN_VALIDATING; @@ -213,7 +220,7 @@ txn_state_e txn_commit (txn_t *txn) { version_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; int i; for (i = 0; i < txn->writes_count; ++i) { - update_t *update = (update_t *)txn->writes[i].rec; + update_t *update = txn->writes[i].rec; update->version = wv; } @@ -237,100 +244,132 @@ txn_state_e txn_commit (txn_t *txn) { // Get most recent committed version prior to our read version. map_val_t txn_map_get (txn_t *txn, map_key_t key) { - if (txn->state != TXN_RUNNING) + TRACE("x1", "txn_map_get: txn %p map %p", txn, txn->map); + TRACE("x1", "txn_map_get: key %p", key, 0); + + if (txn->state != TXN_RUNNING) { + TRACE("x1", "txn_map_get: error txn not running (state %p)", txn->state, 0); return ERROR_TXN_NOT_RUNNING; + } - // Iterate through the update records to find the latest committed version prior to our read version. + // Iterate through the update records to find the latest committed version prior to our read version. map_val_t newest_val = map_get(txn->map, key); map_val_t val = newest_val; - update_t *update = NULL; - for ( ; ; val = update->next) { - - if (!IS_TAGGED(val, TAG2)) + update_t *update; + for ( ; (update = VAL_TO_PTR(val)) != NULL ; val = update->next) { + + // If TAG2 is set in it indicates that is an update record. Otherwise all the following are + // true: is a literal value, it is older than any currently active transaction, and it is the most + // recently set value for its key. Therefore it is visible to . + if (!IS_TAGGED(val, TAG2)) { + TRACE("x1", "txn_map_get: found untagged value; returning %p", val, 0); return val; - - update = (update_t *)STRIP_TAG(val, TAG2); - assert(update != NULL); + } // If the update's version is not tagged it means the update is committed. if (!IS_TAGGED(update->version, TAG1)) { - if (update->version <= txn->rv) + if (update->version <= txn->rv) { + TRACE("x2", "txn_map_get: found committed update %p (version %p)", update, update->version); break; // success + } + TRACE("x2", "txn_map_get: skipping update %p (version %p)", update, update->version); continue; } - // If the update's version is tagged then either the update was aborted or the the version number is + // If the update's version is tagged then either the update was aborted or the the version number is // actually a pointer to a running transaction's txn_t. // Skip updates from aborted transactions. - if (EXPECT_FALSE(update->version == ABORTED_VERSION)) + if (EXPECT_FALSE(update->version == ABORTED_VERSION)) { + TRACE("x2", "txn_map_get: skipping aborted update %p", update, 0); continue; + } // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); - if (writer == txn) // found our own update - break; // success + txn_t *writer = (txn_t *)VAL_TO_PTR(update->version); + if (writer == txn) { + TRACE("x2", "txn_map_get: found txn's own update %p", update, 0); + break; // success + } txn_state_e writer_state = writer->state; - if (writer_state == TXN_RUNNING) - continue; + if (writer_state == TXN_RUNNING) { + TRACE("x2", "txn_map_get: skipping update %p of in-progress transaction %p", update, writer); + continue; + } if (writer_state == TXN_VALIDATING) { + TRACE("x2", "txn_map_get: update %p transaction %p validating", update, writer); if (writer->wv > txn->rv) continue; writer_state = txn_validate(writer); } // Skip updates from aborted transactions. - if (writer_state == TXN_ABORTED) + if (writer_state == TXN_ABORTED) { + TRACE("x2", "txn_map_get: skipping aborted update %p", update, 0); continue; + } assert(writer_state == TXN_VALIDATED); - if (writer->wv > txn->rv) + if (writer->wv > txn->rv) { + TRACE("x2", "txn_map_get: skipping update %p (version %p)", update, update->version); continue; + } break; // success } + if (update == NULL) { + TRACE("x1", "txn_map_get: key does not exist in map", key, 0); + return DOES_NOT_EXIST; + } + map_val_t value = update->value; + TRACE("x1", "txn_map_get: key found returning value %p", value, 0); + return value; // collect some garbage version_t min_active_version = UNDETERMINED_VERSION; update_t *next_update = NULL; if (IS_TAGGED(update->next, TAG2)) { - next_update = (update_t *)STRIP_TAG(update->next, TAG2); + next_update = VAL_TO_PTR(update->next); + + // If (and all update records following it [execpt if it is aborted]) is old enough + // that it is not visible to any active transaction we can safely free it. min_active_version = (version_t)sl_min_key(active_); if (next_update->version < min_active_version) { - // (and all update records following it [execpt if it is aborted]) is old enough that it is - // not visible to any active transaction. We can safely free it. - // Skip over aborted versions to look for more recent updates + // If the is aborted, skip over it to look for more recent ones that may follow update_t *temp = next_update; while (temp->version == ABORTED_VERSION) { assert(!IS_TAGGED(temp->version, TAG1)); - map_val_t next = next_update->next; + map_val_t next = temp->next; if (!IS_TAGGED(next, TAG2)) break; - temp = (update_t *)STRIP_TAG(next, TAG2); - if (temp->version >= min_active_version) + // Bail out of garbage collection if we find a record that might still be accessed by an + // ongoing transaction. + if (VAL_TO_PTR(next)->version >= min_active_version) return value; + + temp = VAL_TO_PTR(next); } - // free and all the update records following it + // free the next update record and all the ones following it temp = next_update; - while (1) { - map_val_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); + map_val_t next; + do { + next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); // if we find ourself in a race just back off and let the other thread take care of it - if (next == DOES_NOT_EXIST) + if (next == DOES_NOT_EXIST) return value; - if (!IS_TAGGED(next, TAG2)) - break; + nbd_free(temp); - temp = (update_t *)STRIP_TAG(next, TAG2); - nbd_free(update); - } + temp = VAL_TO_PTR(next); + + } while (IS_TAGGED(next, TAG2)); } } @@ -346,25 +385,36 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) { } } } - + return value; } void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) { - if (txn->state != TXN_RUNNING) - return; // TODO: return some sort of error code + TRACE("x1", "txn_map_set: txn %p map %p", txn, txn->map); + TRACE("x1", "txn_map_set: key %p value %p", key, value); + assert(!IS_TAGGED(value, TAG1) && !IS_TAGGED(value, TAG2)); + + if (txn->state != TXN_RUNNING) { + TRACE("x1", "txn_map_set: error txn not running (state %p)", txn->state, 0); + return; + } // create a new update record - update_t *update = alloc_update_rec(); - update->value = value; - update->version = TAG_VALUE((version_t)txn, TAG1); + version_t ver = TAG_VALUE(PTR_TO_VAL(txn), TAG1); // tagged versions are txn_t pointers + update_t *update = alloc_update_rec(ver, value); // push the new update record onto 's update list - map_val_t old_update; + map_val_t old_update = map_get(txn->map, key); + TRACE("x2", "txn_map_set: old update %p new update record %p", old_update, update); do { - old_update = map_get(txn->map, key); update->next = old_update; - } while (map_cas(txn->map, key, old_update, TAG_VALUE((map_val_t)update, TAG2)) != old_update); + map_val_t temp = map_cas(txn->map, key, old_update, TAG_VALUE(PTR_TO_VAL(update), TAG2)); + if (temp == old_update) + break; + + TRACE("x1", "txn_map_set: cas failed; found %p expected %p", temp, old_update); + old_update = temp; + } while (1); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {