X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;h=e05d332386fe8284727fc0ced57c107d26a59cf2;hp=1073f7b6a24e3ae25fe5f124e7b11496f19604a5;hb=d26bac75802a324ed98c8d3d88cfb9eb87b3b35a;hpb=fa60fb1fb78136edc9e5863c0b86222f572963ef diff --git a/txn/txn.c b/txn/txn.c index 1073f7b..e05d332 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -5,6 +5,7 @@ #include "common.h" #include "txn.h" #include "mem.h" +#include "skiplist.h" #define UNDETERMINED_VERSION 0 #define INITIAL_WRITES_SIZE 4 @@ -21,7 +22,7 @@ struct update_rec { }; typedef struct write_rec { - const char *key; + void *key; update_rec_t *rec; } write_rec_t; @@ -33,20 +34,25 @@ struct txn { uint32_t writes_size; uint32_t writes_count; uint32_t writes_scan; - txn_access_e access; - txn_isolation_e isolation; + txn_type_e type; txn_state_e state; }; +static uint64_t version_ = 1; + static txn_state_e txn_validate (txn_t *txn); -static uint64_t version_ = 1; +static map_t *active_ = NULL; + +void txn_init (void) { + active_ = map_alloc(MAP_TYPE_SKIPLIST, NULL); +} // Validate the updates for . Validation fails for a key we have written to if there is a // write committed newer than our read version. -static txn_state_e tm_validate_key (txn_t *txn, const char *key, uint32_t key_len) { +static txn_state_e tm_validate_key (txn_t *txn, void *key) { - update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len); + update_rec_t *update = (update_rec_t *) map_get(txn->map, key); for (; update != NULL; update = update->prev) { uint64_t writer_version = update->version; if (writer_version <= txn->rv) @@ -102,7 +108,7 @@ static txn_state_e txn_validate (txn_t *txn) { } for (i = 0; i < txn->writes_count; ++i) { - txn_state_e s = tm_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key)); + txn_state_e s = tm_validate_key(txn, txn->writes[i].key); if (s == TXN_ABORTED) { txn->state = TXN_ABORTED; break; @@ -130,19 +136,39 @@ static update_rec_t *alloc_update_rec (void) { return u; } -txn_t *txn_begin (txn_access_e access, txn_isolation_e isolation, map_type_t map_type) { +txn_t *txn_begin (txn_type_e type, map_t *map) { txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); memset(txn, 0, sizeof(txn_t)); - txn->access = access; - txn->isolation = isolation; - txn->rv = version_; + txn->type = type; txn->wv = UNDETERMINED_VERSION; txn->state = TXN_RUNNING; - txn->map = map_alloc(map_type); - if (isolation != TXN_READ_ONLY) { + txn->map = map; + if (type != TXN_READ_ONLY) { txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); txn->writes_size = INITIAL_WRITES_SIZE; } + + // aquire the read version for txn. + do { + txn->rv = version_; + + uint64_t old_count; + uint64_t temp = 0; + do { + old_count = temp; + temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1); + } while (temp != old_count); + + if (txn->rv == version_) + break; + + temp = 1; + do { + old_count = temp; + temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + } while (temp != old_count); + } while (1); + return txn; } @@ -179,11 +205,11 @@ txn_state_e txn_commit (txn_t *txn) { } // Get most recent committed version prior to our read version. -uint64_t tm_get (txn_t *txn, const char *key, uint32_t key_len) { +uint64_t tm_get (txn_t *txn, void *key) { // Iterate through update records associated with to find the latest committed version. // We can use the first matching version. Older updates always come later in the list. - update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len); + update_rec_t *update = (update_rec_t *) map_get(txn->map, key); for (; update != NULL; update = update->prev) { uint64_t writer_version = update->version; if (writer_version < txn->rv) @@ -213,7 +239,7 @@ uint64_t tm_get (txn_t *txn, const char *key, uint32_t key_len) { return DOES_NOT_EXIST; } -void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value) { +void tm_set (txn_t *txn, void *key, uint64_t value) { // create a new update record update_rec_t *update = alloc_update_rec(); @@ -224,9 +250,9 @@ void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value) { // push the new update record onto 's update list uint64_t update_prev; do { - update->prev = (update_rec_t *) map_get(txn->map, key, key_len); + update->prev = (update_rec_t *) map_get(txn->map, key); update_prev = (uint64_t)update->prev; - } while (map_cas(txn->map, key, key_len, update_prev, (uint64_t)update) != update_prev); + } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {