X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;h=1cc8e2f84fdeff975d719bfada0608bed62c42bb;hp=ea3d6df4ccbb5729fc0447218979b0bafbee703e;hb=f3eb4799a11ceaeb47ab02034595b5d641c2f1c9;hpb=dbcd4739e02b8e774e28b752c412d7e2f242cd47 diff --git a/txn/txn.c b/txn/txn.c index ea3d6df..1cc8e2f 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -5,6 +5,7 @@ #include "common.h" #include "txn.h" #include "mem.h" +#include "rcu.h" #include "skiplist.h" #define UNDETERMINED_VERSION 0 @@ -12,34 +13,34 @@ #define INITIAL_WRITES_SIZE 4 typedef struct update_rec update_t; +typedef map_key_t version_t; struct update_rec { - update_t *next; // an earlier update - uint64_t version; - uint64_t value; + version_t version; + map_val_t value; + map_val_t next; // an earlier update }; typedef struct write_rec { - void *key; + map_key_t key; update_t *rec; } write_rec_t; struct txn { - uint64_t rv; - uint64_t wv; + version_t rv; + version_t wv; map_t *map; write_rec_t *writes; - uint32_t writes_size; - uint32_t writes_count; - uint32_t writes_scan; - txn_type_e type; + size_t writes_size; + size_t writes_count; + size_t writes_scan; txn_state_e state; }; -static uint64_t version_ = 1; - static txn_state_e txn_validate (txn_t *txn); +static version_t version_ = 1; + static skiplist_t *active_ = NULL; void txn_init (void) { @@ -52,19 +53,21 @@ void txn_init (void) { // If we encounter a potential conflict with a transaction that is in the process of validating, we help it // complete validating. It must be finished before we can decide to rollback or commit. // -static txn_state_e tm_validate_key (txn_t *txn, void *key) { +static txn_state_e validate_key (txn_t *txn, map_key_t key) { + assert(txn->state != TXN_RUNNING); - update_t *update = (update_t *) map_get(txn->map, key); - for (; update != NULL; update = update->next) { + map_val_t val = map_get(txn->map, key); + update_t *update = NULL; + for (; val != DOES_NOT_EXIST; val = update->next) { // If the update or its version is not tagged it means the update is committed. // // We can stop at the first committed record we find that is at least as old as our read version. All // the other committed records following it will be older. And all the uncommitted records following it // will eventually conflict with it and abort. - if (!IS_TAGGED(update, TAG2)) + if (!IS_TAGGED(val, TAG2)) return TXN_VALIDATED; - update = (update_t *)STRIP_TAG(update, TAG2); + update = (update_t *)STRIP_TAG(val, TAG2); if (!IS_TAGGED(update->version, TAG1)) return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; @@ -111,21 +114,23 @@ static txn_state_e tm_validate_key (txn_t *txn, void *key) { } static txn_state_e txn_validate (txn_t *txn) { + assert(txn->state != TXN_RUNNING); int i; switch (txn->state) { case TXN_VALIDATING: if (txn->wv == UNDETERMINED_VERSION) { - uint64_t wv = SYNC_ADD(&version_, 1); + version_t wv = SYNC_ADD(&version_, 1); SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv); } for (i = 0; i < txn->writes_count; ++i) { - txn_state_e s = tm_validate_key(txn, txn->writes[i].key); + txn_state_e s = validate_key(txn, txn->writes[i].key); if (s == TXN_ABORTED) { txn->state = TXN_ABORTED; break; } + assert(s == TXN_VALIDATED); } if (txn->state == TXN_VALIDATING) { txn->state = TXN_VALIDATED; @@ -149,27 +154,24 @@ static update_t *alloc_update_rec (void) { return u; } -txn_t *txn_begin (txn_type_e type, map_t *map) { +txn_t *txn_begin (map_t *map) { txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); memset(txn, 0, sizeof(txn_t)); - txn->type = type; txn->wv = UNDETERMINED_VERSION; txn->state = TXN_RUNNING; txn->map = map; - if (type != TXN_READ_ONLY) { - txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); - txn->writes_size = INITIAL_WRITES_SIZE; - } + txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); + txn->writes_size = INITIAL_WRITES_SIZE; // acquire the read version for txn. must be careful to avoid a race do { txn->rv = version_; - uint64_t old_count; - uint64_t temp = 0; + unsigned old_count; + unsigned temp = 0; do { old_count = temp; - temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1); + temp = sl_cas(active_, txn->rv, old_count, old_count + 1); } while (temp != old_count); if (txn->rv == version_) @@ -178,7 +180,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { temp = 1; do { old_count = temp; - temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1); } while (temp != old_count); } while (1); @@ -186,6 +188,8 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { } void txn_abort (txn_t *txn) { + if (txn->state != TXN_RUNNING) + return; // TODO: return some sort of error code int i; for (i = 0; i < txn->writes_count; ++i) { @@ -193,18 +197,20 @@ void txn_abort (txn_t *txn) { update->version = ABORTED_VERSION; } - nbd_defer_free(txn->writes); - nbd_defer_free(txn); + rcu_defer_free(txn->writes); + rcu_defer_free(txn); } txn_state_e txn_commit (txn_t *txn) { + if (txn->state != TXN_RUNNING) + return txn->state; // TODO: return some sort of error code assert(txn->state == TXN_RUNNING); txn->state = TXN_VALIDATING; txn_state_e state = txn_validate(txn); // Detach from its updates. - uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; + version_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; int i; for (i = 0; i < txn->writes_count; ++i) { update_t *update = (update_t *)txn->writes[i].rec; @@ -212,38 +218,38 @@ txn_state_e txn_commit (txn_t *txn) { } // Lower the reference count for 's read version - uint64_t temp = 2; - uint64_t old_count; + unsigned temp = 2; + unsigned old_count; do { old_count = temp; - temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1); if (temp == 1 && txn->rv != version_) { - sl_remove(active_, (void *)txn->rv); + sl_remove(active_, (map_key_t)txn->rv); break; } } while (old_count != temp); - nbd_defer_free(txn->writes); - nbd_defer_free(txn); + rcu_defer_free(txn->writes); + rcu_defer_free(txn); return state; } // Get most recent committed version prior to our read version. -uint64_t tm_get (txn_t *txn, void *key) { - - update_t *newest_update = (update_t *) map_get(txn->map, key); - if (!IS_TAGGED(newest_update, TAG2)) - return (uint64_t)newest_update; +map_val_t txn_map_get (txn_t *txn, map_key_t key) { + if (txn->state != TXN_RUNNING) + return ERROR_TXN_NOT_RUNNING; // Iterate through the update records to find the latest committed version prior to our read version. - update_t *update; - for (update = newest_update; ; update = update->next) { + map_val_t newest_val = map_get(txn->map, key); + map_val_t val = newest_val; + update_t *update = NULL; + for ( ; ; val = update->next) { - if (!IS_TAGGED(update, TAG2)) - return (uint64_t)update; + if (!IS_TAGGED(val, TAG2)) + return val; - update = (update_t *)STRIP_TAG(update, TAG2); + update = (update_t *)STRIP_TAG(val, TAG2); assert(update != NULL); // If the update's version is not tagged it means the update is committed. @@ -285,74 +291,80 @@ uint64_t tm_get (txn_t *txn, void *key) { break; // success } - uint64_t value = update->value; + map_val_t value = update->value; // collect some garbage - update_t *last = update; - update_t *next = update->next; - uint64_t min_active = 0; - if (IS_TAGGED(next, TAG2)) { - next = (update_t *)STRIP_TAG(next, TAG2); - min_active = (uint64_t)sl_min_key(active_); - if (next->version < min_active) { - - // Skip over aborted versions to verify the chain of updates is old enough for collection - update_t *temp = next; + version_t min_active_version = UNDETERMINED_VERSION; + update_t *next_update = NULL; + if (IS_TAGGED(update->next, TAG2)) { + next_update = (update_t *)STRIP_TAG(update->next, TAG2); + min_active_version = (version_t)sl_min_key(active_); + if (next_update->version < min_active_version) { + // (and all update records following it [execpt if it is aborted]) is old enough that it is + // not visible to any active transaction. We can safely free it. + + // Skip over aborted versions to look for more recent updates + update_t *temp = next_update; while (temp->version == ABORTED_VERSION) { assert(!IS_TAGGED(temp->version, TAG1)); - update_t *temp = next->next; - if (!IS_TAGGED(temp, TAG2)) + map_val_t next = next_update->next; + if (!IS_TAGGED(next, TAG2)) break; - temp = (update_t *)STRIP_TAG(temp, TAG2); - if (temp->version >= min_active) + + temp = (update_t *)STRIP_TAG(next, TAG2); + if (temp->version >= min_active_version) return value; - temp = temp->next; } - // collect and all the update records following it - do { - next = SYNC_SWAP(&update->next, NULL); + // free and all the update records following it + temp = next_update; + while (1) { + map_val_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); // if we find ourself in a race just back off and let the other thread take care of it - if (next == NULL) + if (next == DOES_NOT_EXIST) return value; - update = next; - next = next->next; + if (!IS_TAGGED(next, TAG2)) + break; + + temp = (update_t *)STRIP_TAG(next, TAG2); nbd_free(update); - } while (IS_TAGGED(next, TAG2)); + } } } // If there is one item left and it is visible by all active transactions we can merge it into the map itself. // There is no need for an update record. - if (next == NULL && last == (update_t *)STRIP_TAG(newest_update, TAG2)) { - if (min_active == UNDETERMINED_VERSION) { - min_active = (uint64_t)sl_min_key(active_); + if (next_update == NULL && val == newest_val) { + if (min_active_version == UNDETERMINED_VERSION) { + min_active_version = (version_t)sl_min_key(active_); } - if (last->version <= min_active) { - if (map_cas(txn->map, key, TAG_VALUE(last, TAG2), value) == TAG_VALUE(last, TAG2)) { - nbd_defer_free(last); + if (update->version <= min_active_version) { + if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) { + rcu_defer_free(update); } } - } + } return value; } -void tm_set (txn_t *txn, void *key, uint64_t value) { +void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) { + if (txn->state != TXN_RUNNING) + return; // TODO: return some sort of error code // create a new update record update_t *update = alloc_update_rec(); update->value = value; - update->version = TAG_VALUE(txn, TAG1); + update->version = TAG_VALUE((version_t)txn, TAG1); // push the new update record onto 's update list - uint64_t old_update; + map_val_t old_update; do { old_update = map_get(txn->map, key); - update->next = (update_t *)old_update; - } while (map_cas(txn->map, key, old_update, TAG_VALUE(update, TAG2)) != old_update); + update->next = old_update; + } while (map_cas(txn->map, key, old_update, TAG_VALUE((map_val_t)update, TAG2)) != old_update); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {