X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=txn%2Ftxn.c;h=ef8313f56aaa5fe4c4796f4b200b21c727b9894d;hp=2893d35dbcfb40ec5152775146f59224cb96034d;hb=4ae7c1069667d8f067258d89676126f9b44226d6;hpb=8c48c212e119ba28b0666c9ec3faa97cbd11ca7a diff --git a/txn/txn.c b/txn/txn.c index 2893d35..ef8313f 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -29,9 +29,9 @@ struct txn { uint64_t wv; map_t *map; write_rec_t *writes; - uint64_t writes_size; - uint64_t writes_count; - uint64_t writes_scan; + size_t writes_size; + size_t writes_count; + size_t writes_scan; txn_state_e state; }; @@ -65,7 +65,7 @@ static txn_state_e validate_key (txn_t *txn, map_key_t key) { // will eventually conflict with it and abort. if (!IS_TAGGED(val, TAG2)) return TXN_VALIDATED; - update = (update_t *)(size_t)STRIP_TAG(val, TAG2); + update = (update_t *)STRIP_TAG(val, TAG2); if (!IS_TAGGED(update->version, TAG1)) return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; @@ -77,7 +77,7 @@ static txn_state_e validate_key (txn_t *txn, map_key_t key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); if (writer == txn) continue; // Skip our own updates. txn_state_e writer_state = writer->state; @@ -234,20 +234,20 @@ txn_state_e txn_commit (txn_t *txn) { } // Get most recent committed version prior to our read version. -uint64_t txn_map_get (txn_t *txn, map_key_t key) { +map_val_t txn_map_get (txn_t *txn, map_key_t key) { if (txn->state != TXN_RUNNING) return ERROR_TXN_NOT_RUNNING; // Iterate through the update records to find the latest committed version prior to our read version. - uint64_t newest_val = map_get(txn->map, key); - uint64_t val = newest_val; + map_val_t newest_val = map_get(txn->map, key); + map_val_t val = newest_val; update_t *update = NULL; for ( ; ; val = update->next) { if (!IS_TAGGED(val, TAG2)) return val; - update = (update_t *)(size_t)STRIP_TAG(val, TAG2); + update = (update_t *)STRIP_TAG(val, TAG2); assert(update != NULL); // If the update's version is not tagged it means the update is committed. @@ -265,7 +265,7 @@ uint64_t txn_map_get (txn_t *txn, map_key_t key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); if (writer == txn) // found our own update break; // success @@ -289,13 +289,13 @@ uint64_t txn_map_get (txn_t *txn, map_key_t key) { break; // success } - uint64_t value = update->value; + map_val_t value = update->value; // collect some garbage uint64_t min_active_version = UNDETERMINED_VERSION; update_t *next_update = NULL; if (IS_TAGGED(update->next, TAG2)) { - next_update = (update_t *)(size_t)STRIP_TAG(update->next, TAG2); + next_update = (update_t *)STRIP_TAG(update->next, TAG2); min_active_version = (uint64_t)sl_min_key(active_); if (next_update->version < min_active_version) { // (and all update records following it [execpt if it is aborted]) is old enough that it is @@ -309,7 +309,7 @@ uint64_t txn_map_get (txn_t *txn, map_key_t key) { if (!IS_TAGGED(next, TAG2)) break; - temp = (update_t *)(size_t)STRIP_TAG(next, TAG2); + temp = (update_t *)STRIP_TAG(next, TAG2); if (temp->version >= min_active_version) return value; } @@ -326,7 +326,7 @@ uint64_t txn_map_get (txn_t *txn, map_key_t key) { if (!IS_TAGGED(next, TAG2)) break; - temp = (update_t *)(size_t)STRIP_TAG(next, TAG2); + temp = (update_t *)STRIP_TAG(next, TAG2); nbd_free(update); } } @@ -348,21 +348,21 @@ uint64_t txn_map_get (txn_t *txn, map_key_t key) { return value; } -void txn_map_set (txn_t *txn, map_key_t key, uint64_t value) { +void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) { if (txn->state != TXN_RUNNING) return; // TODO: return some sort of error code // create a new update record update_t *update = alloc_update_rec(); update->value = value; - update->version = TAG_VALUE((uint64_t)(size_t)txn, TAG1); + update->version = TAG_VALUE((uint64_t)txn, TAG1); // push the new update record onto 's update list uint64_t old_update; do { old_update = map_get(txn->map, key); update->next = old_update; - } while (map_cas(txn->map, key, old_update, TAG_VALUE((uint64_t)(size_t)update, TAG2)) != old_update); + } while (map_cas(txn->map, key, old_update, TAG_VALUE((uint64_t)update, TAG2)) != old_update); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {