X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;h=e05d332386fe8284727fc0ced57c107d26a59cf2;hp=8f5437015692c2aad9246a181c30c7d6bca47da7;hb=d26bac75802a324ed98c8d3d88cfb9eb87b3b35a;hpb=a1fae129c758d7ea83dfdbb5f14ec1df12f0aa34 diff --git a/txn/txn.c b/txn/txn.c index 8f54370..e05d332 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -5,11 +5,11 @@ #include "common.h" #include "txn.h" #include "mem.h" +#include "skiplist.h" #define UNDETERMINED_VERSION 0 #define INITIAL_WRITES_SIZE 4 -typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t; typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t; typedef struct update_rec update_rec_t; @@ -22,90 +22,37 @@ struct update_rec { }; typedef struct write_rec { - const char *key; + void *key; update_rec_t *rec; } write_rec_t; struct txn { uint64_t rv; uint64_t wv; - hashtable_t *ht; + map_t *map; write_rec_t *writes; uint32_t writes_size; uint32_t writes_count; uint32_t writes_scan; - txn_access_t access; - txn_isolation_t isolation; - txn_state_t state; + txn_type_e type; + txn_state_e state; }; -uint64_t GlobalVersion = 1; -uint64_t MinActiveTxnVersion = 0; +static uint64_t version_ = 1; -static txn_state_t txn_validate (txn_t *txn); +static txn_state_e txn_validate (txn_t *txn); -update_rec_t *alloc_update_rec (void) { - update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t)); - memset(u, 0, sizeof(update_rec_t)); - return u; -} - -txn_t *txn_begin (txn_access_t access, txn_isolation_t isolation, hashtable_t *ht) { - txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); - memset(txn, 0, sizeof(txn_t)); - txn->access = access; - txn->isolation = isolation; - txn->rv = GlobalVersion; - txn->wv = UNDETERMINED_VERSION; - txn->state = TXN_RUNNING; - txn->ht = ht; - if (isolation != TXN_READ_ONLY) { - txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); - txn->writes_size = INITIAL_WRITES_SIZE; - } - return txn; -} - -// Get most recent committed version prior to our read version. -int64_t txn_ht_get (txn_t *txn, const char *key, uint32_t key_len) { - - // Iterate through update records associated with to find the latest committed version. - // We can use the first matching version. Older updates always come later in the list. - update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len); - for (; update != NULL; update = update->prev) { - uint64_t writer_version = update->version; - if (writer_version < txn->rv) - return update->value; - - // If the version is tagged, it means that it is not a version number, but a pointer to an - // in progress transaction. - if (IS_TAGGED(update->version)) { - txn_t *writer = (txn_t *)STRIP_TAG(writer_version); +static map_t *active_ = NULL; - if (writer == txn) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; - - // Skip updates from aborted transactions. - txn_state_t writer_state = writer->state; - if (EXPECT_FALSE(writer_state == TXN_ABORTED)) - continue; - - if (writer_state == TXN_VALIDATING) { - writer_state = txn_validate(writer); - } - - if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; - } - } - return DOES_NOT_EXIST; +void txn_init (void) { + active_ = map_alloc(MAP_TYPE_SKIPLIST, NULL); } // Validate the updates for . Validation fails for a key we have written to if there is a // write committed newer than our read version. -static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t key_len) { +static txn_state_e tm_validate_key (txn_t *txn, void *key) { - update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len); + update_rec_t *update = (update_rec_t *) map_get(txn->map, key); for (; update != NULL; update = update->prev) { uint64_t writer_version = update->version; if (writer_version <= txn->rv) @@ -127,7 +74,7 @@ static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t ke if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION) return TXN_VALIDATED; - txn_state_t writer_state = writer->state; + txn_state_e writer_state = writer->state; if (EXPECT_FALSE(writer_state == TXN_ABORTED)) continue; @@ -150,18 +97,18 @@ static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t ke return TXN_VALIDATED; } -static txn_state_t txn_validate (txn_t *txn) { +static txn_state_e txn_validate (txn_t *txn) { int i; switch (txn->state) { case TXN_VALIDATING: if (txn->wv == UNDETERMINED_VERSION) { - uint64_t wv = SYNC_ADD(&GlobalVersion, 1); + uint64_t wv = SYNC_ADD(&version_, 1); SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv); } for (i = 0; i < txn->writes_count; ++i) { - txn_state_t s = txn_ht_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key)); + txn_state_e s = tm_validate_key(txn, txn->writes[i].key); if (s == TXN_ABORTED) { txn->state = TXN_ABORTED; break; @@ -183,6 +130,48 @@ static txn_state_t txn_validate (txn_t *txn) { return txn->state; } +static update_rec_t *alloc_update_rec (void) { + update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t)); + memset(u, 0, sizeof(update_rec_t)); + return u; +} + +txn_t *txn_begin (txn_type_e type, map_t *map) { + txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); + memset(txn, 0, sizeof(txn_t)); + txn->type = type; + txn->wv = UNDETERMINED_VERSION; + txn->state = TXN_RUNNING; + txn->map = map; + if (type != TXN_READ_ONLY) { + txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); + txn->writes_size = INITIAL_WRITES_SIZE; + } + + // aquire the read version for txn. + do { + txn->rv = version_; + + uint64_t old_count; + uint64_t temp = 0; + do { + old_count = temp; + temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1); + } while (temp != old_count); + + if (txn->rv == version_) + break; + + temp = 1; + do { + old_count = temp; + temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + } while (temp != old_count); + } while (1); + + return txn; +} + void txn_abort (txn_t *txn) { int i; @@ -195,11 +184,11 @@ void txn_abort (txn_t *txn) { nbd_defer_free(txn); } -txn_state_t txn_commit (txn_t *txn) { +txn_state_e txn_commit (txn_t *txn) { assert(txn->state == TXN_RUNNING); txn->state = TXN_VALIDATING; - txn_state_t state = txn_validate(txn); + txn_state_e state = txn_validate(txn); // Detach from its updates. uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv; @@ -215,7 +204,42 @@ txn_state_t txn_commit (txn_t *txn) { return state; } -void txn_ht_put (txn_t *txn, const char *key, uint32_t key_len, int64_t value) { +// Get most recent committed version prior to our read version. +uint64_t tm_get (txn_t *txn, void *key) { + + // Iterate through update records associated with to find the latest committed version. + // We can use the first matching version. Older updates always come later in the list. + update_rec_t *update = (update_rec_t *) map_get(txn->map, key); + for (; update != NULL; update = update->prev) { + uint64_t writer_version = update->version; + if (writer_version < txn->rv) + return update->value; + + // If the version is tagged, it means that it is not a version number, but a pointer to an + // in progress transaction. + if (IS_TAGGED(update->version)) { + txn_t *writer = (txn_t *)STRIP_TAG(writer_version); + + if (writer == txn) + return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + + // Skip updates from aborted transactions. + txn_state_e writer_state = writer->state; + if (EXPECT_FALSE(writer_state == TXN_ABORTED)) + continue; + + if (writer_state == TXN_VALIDATING) { + writer_state = txn_validate(writer); + } + + if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION) + return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + } + } + return DOES_NOT_EXIST; +} + +void tm_set (txn_t *txn, void *key, uint64_t value) { // create a new update record update_rec_t *update = alloc_update_rec(); @@ -224,11 +248,11 @@ void txn_ht_put (txn_t *txn, const char *key, uint32_t key_len, int64_t value) { update->version = TAG_VALUE((uint64_t)txn); // push the new update record onto 's update list - int64_t update_prev; + uint64_t update_prev; do { - update->prev = (update_rec_t *) ht_get(txn->ht, key, key_len); - update_prev = (int64_t)update->prev; - } while (ht_compare_and_set(txn->ht, key, key_len, update_prev, (int64_t)update) != update_prev); + update->prev = (update_rec_t *) map_get(txn->map, key); + update_prev = (uint64_t)update->prev; + } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) { @@ -240,11 +264,3 @@ void txn_ht_put (txn_t *txn, const char *key, uint32_t key_len, int64_t value) { txn->writes[i].key = key; txn->writes[i].rec = update; } - -#ifdef MAKE_txn_test -#include "runtime.h" -int main (void) { - nbd_init(); - return 0; -} -#endif//txn_test