X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;h=6e6b7973703f9b5792e6dbef772a1c6ab0895be3;hp=52758824e60dc3d9b7f2973364a12f81acccebf8;hb=b0f5164fac83e2ad24ab1f56d5c1f022279372ab;hpb=7378edffa5751159d35eab31eceb76a1f16231d0 diff --git a/txn/txn.c b/txn/txn.c index 5275882..6e6b797 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -5,8 +5,10 @@ #include "common.h" #include "txn.h" #include "mem.h" +#include "skiplist.h" #define UNDETERMINED_VERSION 0 +#define ABORTED_VERSION TAG_VALUE(0) #define INITIAL_WRITES_SIZE 4 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t; @@ -17,11 +19,11 @@ struct update_rec { update_type_t type; uint64_t value; uint64_t version; - update_rec_t *prev; // a previous update + update_rec_t *next; // an earlier update }; typedef struct write_rec { - const char *key; + void *key; update_rec_t *rec; } write_rec_t; @@ -33,59 +35,76 @@ struct txn { uint32_t writes_size; uint32_t writes_count; uint32_t writes_scan; - txn_access_e access; - txn_isolation_e isolation; + txn_type_e type; txn_state_e state; }; -static txn_state_e txn_validate (txn_t *txn); - static uint64_t version_ = 1; -// Validate the updates for . Validation fails for a key we have written to if there is a -// write committed newer than our read version. -static txn_state_e tm_validate_key (txn_t *txn, const char *key, uint32_t key_len) { - - update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len); - for (; update != NULL; update = update->prev) { - uint64_t writer_version = update->version; - if (writer_version <= txn->rv) - return TXN_VALIDATED; - - // If the version is tagged, it means it is a pointer to a transaction in progress. - if (IS_TAGGED(writer_version)) { - - // Skip aborted transactions. - if (EXPECT_FALSE(writer_version == TAG_VALUE(0))) - continue; +static txn_state_e txn_validate (txn_t *txn); - // Skip our own updates. - txn_t *writer = (txn_t *)STRIP_TAG(writer_version); - if (writer == txn) - continue; +static map_t *active_ = NULL; - writer_version = writer->wv; - if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION) - return TXN_VALIDATED; +void txn_init (void) { + active_ = map_alloc(&sl_map_impl, NULL); +} - txn_state_e writer_state = writer->state; - if (EXPECT_FALSE(writer_state == TXN_ABORTED)) +// Validate the updates for . Validation fails if there is a write-write conflict. That is if after our +// read version another transaction committed a change to an entry we are also trying to change. +// +// If we encounter a potential conflict with a transaction that is in the process of validating, we help it +// complete validating. It must be finished before we can decide to rollback or commit. +// +static txn_state_e tm_validate_key (txn_t *txn, void *key) { + + update_rec_t *update = (update_rec_t *) map_get(txn->map, key); + for (; update != NULL; update = update->next) { + + // If the update's version is not tagged it means the update is committed. + // + // We can stop at the first committed record we find that is at least as old as our read version. All + // the other committed records following it will be older. And all the uncommitted records following it + // will eventually conflict with it and abort. + if (!IS_TAGGED(update->version)) + return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; + + // If the update's version is tagged then either the update was aborted or the the version number is + // actually a pointer to a running transaction's txn_t. + + // Skip aborted transactions. + if (EXPECT_FALSE(update->version == ABORTED_VERSION)) + continue; + + // The update's transaction is still in progress. Access its txn_t. + txn_t *writer = (txn_t *)STRIP_TAG(update->version); + if (writer == txn) + continue; // Skip our own updates. + txn_state_e writer_state = writer->state; + + // Any running transaction will only be able to aquire a wv greater than ours. A transaction changes its + // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is + // greater than ours. See next comment below for why. + if (writer_state == TXN_RUNNING) + continue; + + // If has a later version than us we can safely ignore its updates. It will not commit until + // we have completed validation (in order to remain non-blocking it will help us validate if necessary). + // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging + // between validating two conflicting transactions. + if (writer_state == TXN_VALIDATING) { + if (writer->wv > txn->wv) continue; - - // Help validate if it is a committing transaction that might cause us to - // abort. However, if the has a later version than us we can safely ignore its - // updates. This protocol ensures a deterministic resolution to every conflict, and - // avoids infinite ping-ponging between validating two conflicting transactions. - if (writer_state == TXN_VALIDATING && (writer_version < txn->wv || - writer_version == UNDETERMINED_VERSION)) { - writer_state = txn_validate(writer); - } - - if (writer_state == TXN_VALIDATED) - return TXN_ABORTED; + // Help commit. We need to know if aborts or commits before we can decide what to + // do. But we don't want to block, so we assist. + writer_state = txn_validate(writer); } - return TXN_ABORTED; + // Skip updates from aborted transactions. + if (writer_state == TXN_ABORTED) + continue; + + assert(writer_state == TXN_VALIDATED); + return (writer->wv <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; } return TXN_VALIDATED; @@ -102,7 +121,7 @@ static txn_state_e txn_validate (txn_t *txn) { } for (i = 0; i < txn->writes_count; ++i) { - txn_state_e s = tm_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key)); + txn_state_e s = tm_validate_key(txn, txn->writes[i].key); if (s == TXN_ABORTED) { txn->state = TXN_ABORTED; break; @@ -130,19 +149,39 @@ static update_rec_t *alloc_update_rec (void) { return u; } -txn_t *txn_begin (txn_access_e access, txn_isolation_e isolation, map_t *map) { +txn_t *txn_begin (txn_type_e type, map_t *map) { txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); memset(txn, 0, sizeof(txn_t)); - txn->access = access; - txn->isolation = isolation; - txn->rv = version_; + txn->type = type; txn->wv = UNDETERMINED_VERSION; txn->state = TXN_RUNNING; txn->map = map; - if (isolation != TXN_READ_ONLY) { + if (type != TXN_READ_ONLY) { txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); txn->writes_size = INITIAL_WRITES_SIZE; } + + // aquire the read version for txn. must be careful to avoid a race + do { + txn->rv = version_; + + uint64_t old_count; + uint64_t temp = 0; + do { + old_count = temp; + temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1); + } while (temp != old_count); + + if (txn->rv == version_) + break; + + temp = 1; + do { + old_count = temp; + temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + } while (temp != old_count); + } while (1); + return txn; } @@ -151,7 +190,7 @@ void txn_abort (txn_t *txn) { int i; for (i = 0; i < txn->writes_count; ++i) { update_rec_t *update = (update_rec_t *)txn->writes[i].rec; - update->version = TAG_VALUE(0); + update->version = ABORTED_VERSION; } nbd_defer_free(txn->writes); @@ -165,13 +204,26 @@ txn_state_e txn_commit (txn_t *txn) { txn_state_e state = txn_validate(txn); // Detach from its updates. - uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv; + uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; int i; for (i = 0; i < txn->writes_count; ++i) { update_rec_t *update = (update_rec_t *)txn->writes[i].rec; update->version = wv; } + /* + // Lower the reference count for 's read version + uint64_t temp = 1; + uint64_t old_count; + do { + old_count = temp; + temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + } while (old_count != temp); + if (old_count == 0 && version_ != txn->rv) { + map_remove(active_, (void *)txn->rv); + } + */ + nbd_defer_free(txn->writes); nbd_defer_free(txn); @@ -179,41 +231,55 @@ txn_state_e txn_commit (txn_t *txn) { } // Get most recent committed version prior to our read version. -uint64_t tm_get (txn_t *txn, const char *key, uint32_t key_len) { - - // Iterate through update records associated with to find the latest committed version. - // We can use the first matching version. Older updates always come later in the list. - update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len); - for (; update != NULL; update = update->prev) { - uint64_t writer_version = update->version; - if (writer_version < txn->rv) - return update->value; - - // If the version is tagged, it means that it is not a version number, but a pointer to an - // in progress transaction. - if (IS_TAGGED(update->version)) { - txn_t *writer = (txn_t *)STRIP_TAG(writer_version); - - if (writer == txn) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; - - // Skip updates from aborted transactions. - txn_state_e writer_state = writer->state; - if (EXPECT_FALSE(writer_state == TXN_ABORTED)) - continue; +uint64_t tm_get (txn_t *txn, void *key) { + + // Iterate through update records associated with to find the latest committed version prior to our + // read version. + update_rec_t *update = (update_rec_t *) map_get(txn->map, key); + for (; update != NULL; update = update->next) { + + // If the update's version is not tagged it means the update is committed. + if (!IS_TAGGED(update->version)) { + if (update->version <= txn->rv) + return update->value; + continue; + } - if (writer_state == TXN_VALIDATING) { - writer_state = txn_validate(writer); - } + // If the update's version is tagged then either the update was aborted or the the version number is + // actually a pointer to a running transaction's txn_t. + + // Skip updates from aborted transactions. + if (EXPECT_FALSE(update->version == ABORTED_VERSION)) + continue; + + // The update's transaction is still in progress. Access its txn_t. + txn_t *writer = (txn_t *)STRIP_TAG(update->version); + if (writer == txn) // found our own update + return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; - if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + txn_state_e writer_state = writer->state; + if (writer_state == TXN_RUNNING) + continue; + + if (writer_state == TXN_VALIDATING) { + if (writer->wv > txn->rv) + continue; + writer_state = txn_validate(writer); } + + // Skip updates from aborted transactions. + if (writer_state == TXN_ABORTED) + continue; + + assert(writer_state == TXN_VALIDATED); + if (writer->wv > txn->rv) + continue; + return update->value; } return DOES_NOT_EXIST; } -void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value) { +void tm_set (txn_t *txn, void *key, uint64_t value) { // create a new update record update_rec_t *update = alloc_update_rec(); @@ -224,9 +290,9 @@ void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value) { // push the new update record onto 's update list uint64_t update_prev; do { - update->prev = (update_rec_t *) map_get(txn->map, key, key_len); - update_prev = (uint64_t)update->prev; - } while (map_cas(txn->map, key, key_len, update_prev, (uint64_t)update) != update_prev); + update->next = (update_rec_t *) map_get(txn->map, key); + update_prev = (uint64_t)update->next; + } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {