X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;h=5648d16abbba2b2d11b3f0b75bbbb267251746cb;hp=ef8313f56aaa5fe4c4796f4b200b21c727b9894d;hb=c7c053b290f15b7c8ba4f7327ead5a6fe836ec80;hpb=4ae7c1069667d8f067258d89676126f9b44226d6 diff --git a/txn/txn.c b/txn/txn.c index ef8313f..5648d16 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -12,9 +12,10 @@ #define INITIAL_WRITES_SIZE 4 typedef struct update_rec update_t; +typedef map_key_t version_t; struct update_rec { - uint64_t version; + version_t version; map_val_t value; map_val_t next; // an earlier update }; @@ -25,8 +26,8 @@ typedef struct write_rec { } write_rec_t; struct txn { - uint64_t rv; - uint64_t wv; + version_t rv; + version_t wv; map_t *map; write_rec_t *writes; size_t writes_size; @@ -37,7 +38,7 @@ struct txn { static txn_state_e txn_validate (txn_t *txn); -static uint64_t version_ = 1; +static version_t version_ = 1; static skiplist_t *active_ = NULL; @@ -118,7 +119,7 @@ static txn_state_e txn_validate (txn_t *txn) { case TXN_VALIDATING: if (txn->wv == UNDETERMINED_VERSION) { - uint64_t wv = SYNC_ADD(&version_, 1); + version_t wv = SYNC_ADD(&version_, 1); SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv); } @@ -165,11 +166,11 @@ txn_t *txn_begin (map_t *map) { do { txn->rv = version_; - uint64_t old_count; - uint64_t temp = 0; + unsigned old_count; + unsigned temp = 0; do { old_count = temp; - temp = (uint64_t)sl_cas(active_, (map_key_t)txn->rv, old_count, old_count + 1); + temp = sl_cas(active_, txn->rv, old_count, old_count + 1); } while (temp != old_count); if (txn->rv == version_) @@ -208,7 +209,7 @@ txn_state_e txn_commit (txn_t *txn) { txn_state_e state = txn_validate(txn); // Detach from its updates. - uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; + version_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; int i; for (i = 0; i < txn->writes_count; ++i) { update_t *update = (update_t *)txn->writes[i].rec; @@ -216,8 +217,8 @@ txn_state_e txn_commit (txn_t *txn) { } // Lower the reference count for 's read version - uint64_t temp = 2; - uint64_t old_count; + unsigned temp = 2; + unsigned old_count; do { old_count = temp; temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1); @@ -292,11 +293,11 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) { map_val_t value = update->value; // collect some garbage - uint64_t min_active_version = UNDETERMINED_VERSION; + version_t min_active_version = UNDETERMINED_VERSION; update_t *next_update = NULL; if (IS_TAGGED(update->next, TAG2)) { next_update = (update_t *)STRIP_TAG(update->next, TAG2); - min_active_version = (uint64_t)sl_min_key(active_); + min_active_version = (version_t)sl_min_key(active_); if (next_update->version < min_active_version) { // (and all update records following it [execpt if it is aborted]) is old enough that it is // not visible to any active transaction. We can safely free it. @@ -305,7 +306,7 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) { update_t *temp = next_update; while (temp->version == ABORTED_VERSION) { assert(!IS_TAGGED(temp->version, TAG1)); - uint64_t next = next_update->next; + map_val_t next = next_update->next; if (!IS_TAGGED(next, TAG2)) break; @@ -317,7 +318,7 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) { // free and all the update records following it temp = next_update; while (1) { - uint64_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); + map_val_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); // if we find ourself in a race just back off and let the other thread take care of it if (next == DOES_NOT_EXIST) @@ -336,7 +337,7 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) { // There is no need for an update record. if (next_update == NULL && val == newest_val) { if (min_active_version == UNDETERMINED_VERSION) { - min_active_version = (uint64_t)sl_min_key(active_); + min_active_version = (version_t)sl_min_key(active_); } if (update->version <= min_active_version) { if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) { @@ -355,14 +356,14 @@ void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) { // create a new update record update_t *update = alloc_update_rec(); update->value = value; - update->version = TAG_VALUE((uint64_t)txn, TAG1); + update->version = TAG_VALUE((version_t)txn, TAG1); // push the new update record onto 's update list - uint64_t old_update; + map_val_t old_update; do { old_update = map_get(txn->map, key); update->next = old_update; - } while (map_cas(txn->map, key, old_update, TAG_VALUE((uint64_t)update, TAG2)) != old_update); + } while (map_cas(txn->map, key, old_update, TAG_VALUE((map_val_t)update, TAG2)) != old_update); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {