From b0f5164fac83e2ad24ab1f56d5c1f022279372ab Mon Sep 17 00:00:00 2001 From: jdybnis Date: Sun, 7 Dec 2008 08:17:53 +0000 Subject: [PATCH] fix error in validation logic fix compile failure in list.c --- map/list.c | 4 +- test/txn_test.c | 14 ++-- todo | 4 +- txn/txn.c | 176 +++++++++++++++++++++++++++++------------------- 4 files changed, 120 insertions(+), 78 deletions(-) diff --git a/map/list.c b/map/list.c index ac6b6a5..0982e59 100644 --- a/map/list.c +++ b/map/list.c @@ -52,7 +52,9 @@ uint64_t ll_count (list_t *ll) { uint64_t count = 0; node_t *item = ll->head->next; while (item) { - count++; + if (!IS_TAGGED(item->next)) { + count++; + } item = (node_t *)STRIP_TAG(item->next); } return count; diff --git a/test/txn_test.c b/test/txn_test.c index a898e9f..74d5cee 100644 --- a/test/txn_test.c +++ b/test/txn_test.c @@ -13,12 +13,13 @@ void test1 (CuTest* tc) { map_t *map = map_alloc(&ht_map_impl, NULL); txn_t *t1 = txn_begin(TXN_REPEATABLE_READ, map); txn_t *t2 = txn_begin(TXN_REPEATABLE_READ, map); - tm_set(t1, "abc", 2); - tm_set(t1, "abc", 3); - ASSERT_EQUAL( DOES_NOT_EXIST, tm_get(t2, "abc") ); - tm_set(t2, "abc", 4); - ASSERT_EQUAL( 3, tm_get(t1, "abc") ); - ASSERT_EQUAL( 4, tm_get(t2, "abc") ); + void *k1 = (void *)1; + tm_set(t1, k1, 2); + tm_set(t1, k1, 3); + ASSERT_EQUAL( DOES_NOT_EXIST, tm_get(t2, k1) ); + tm_set(t2, k1, 4); + ASSERT_EQUAL( 3, tm_get(t1, k1) ); + ASSERT_EQUAL( 4, tm_get(t2, k1) ); ASSERT_EQUAL( TXN_VALIDATED, txn_commit(t2)); ASSERT_EQUAL( TXN_ABORTED, txn_commit(t1)); } @@ -37,4 +38,3 @@ int main (void) { return 0; } - diff --git a/todo b/todo index 75f997c..fef96ad 100644 --- a/todo +++ b/todo @@ -1,4 +1,4 @@ -- make rcu wait when its buffer gets full, instead of throwing an assert +- make rcu try yielding when its buffer gets full, instead of throwing an assert + fix makefile to compute dependency info as a side-effect of compilation (-MF) - investigate 16 byte CAS; ht can store GUIDs inline instead of pointers to actual keys - testing, testing, testing @@ -9,7 +9,7 @@ + make the interfaces for all data structures consistent + make list and skiplist use string keys + optimize integer keys -- ht_print() ++ ht_print() - iterators - characterize performance of data structures - experiment with the performance impact of not passing the hash between functions diff --git a/txn/txn.c b/txn/txn.c index a6366b3..6e6b797 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -8,6 +8,7 @@ #include "skiplist.h" #define UNDETERMINED_VERSION 0 +#define ABORTED_VERSION TAG_VALUE(0) #define INITIAL_WRITES_SIZE 4 typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t; @@ -18,7 +19,7 @@ struct update_rec { update_type_t type; uint64_t value; uint64_t version; - update_rec_t *prev; // a previous update + update_rec_t *next; // an earlier update }; typedef struct write_rec { @@ -48,50 +49,62 @@ void txn_init (void) { active_ = map_alloc(&sl_map_impl, NULL); } -// Validate the updates for . Validation fails for a key we have written to if there is a -// write committed newer than our read version. +// Validate the updates for . Validation fails if there is a write-write conflict. That is if after our +// read version another transaction committed a change to an entry we are also trying to change. +// +// If we encounter a potential conflict with a transaction that is in the process of validating, we help it +// complete validating. It must be finished before we can decide to rollback or commit. +// static txn_state_e tm_validate_key (txn_t *txn, void *key) { update_rec_t *update = (update_rec_t *) map_get(txn->map, key); - for (; update != NULL; update = update->prev) { - uint64_t writer_version = update->version; - if (writer_version <= txn->rv) - return TXN_VALIDATED; - - // If the version is tagged, it means it is a pointer to a transaction in progress. - if (IS_TAGGED(writer_version)) { - - // Skip aborted transactions. - if (EXPECT_FALSE(writer_version == TAG_VALUE(0))) - continue; - - // Skip our own updates. - txn_t *writer = (txn_t *)STRIP_TAG(writer_version); - if (writer == txn) - continue; - - writer_version = writer->wv; - if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION) - return TXN_VALIDATED; - - txn_state_e writer_state = writer->state; - if (EXPECT_FALSE(writer_state == TXN_ABORTED)) + for (; update != NULL; update = update->next) { + + // If the update's version is not tagged it means the update is committed. + // + // We can stop at the first committed record we find that is at least as old as our read version. All + // the other committed records following it will be older. And all the uncommitted records following it + // will eventually conflict with it and abort. + if (!IS_TAGGED(update->version)) + return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; + + // If the update's version is tagged then either the update was aborted or the the version number is + // actually a pointer to a running transaction's txn_t. + + // Skip aborted transactions. + if (EXPECT_FALSE(update->version == ABORTED_VERSION)) + continue; + + // The update's transaction is still in progress. Access its txn_t. + txn_t *writer = (txn_t *)STRIP_TAG(update->version); + if (writer == txn) + continue; // Skip our own updates. + txn_state_e writer_state = writer->state; + + // Any running transaction will only be able to aquire a wv greater than ours. A transaction changes its + // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is + // greater than ours. See next comment below for why. + if (writer_state == TXN_RUNNING) + continue; + + // If has a later version than us we can safely ignore its updates. It will not commit until + // we have completed validation (in order to remain non-blocking it will help us validate if necessary). + // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging + // between validating two conflicting transactions. + if (writer_state == TXN_VALIDATING) { + if (writer->wv > txn->wv) continue; - - // Help validate if it is a committing transaction that might cause us to - // abort. However, if the has a later version than us we can safely ignore its - // updates. This protocol ensures a deterministic resolution to every conflict, and - // avoids infinite ping-ponging between validating two conflicting transactions. - if (writer_state == TXN_VALIDATING && (writer_version < txn->wv || - writer_version == UNDETERMINED_VERSION)) { - writer_state = txn_validate(writer); - } - - if (writer_state == TXN_VALIDATED) - return TXN_ABORTED; + // Help commit. We need to know if aborts or commits before we can decide what to + // do. But we don't want to block, so we assist. + writer_state = txn_validate(writer); } - return TXN_ABORTED; + // Skip updates from aborted transactions. + if (writer_state == TXN_ABORTED) + continue; + + assert(writer_state == TXN_VALIDATED); + return (writer->wv <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; } return TXN_VALIDATED; @@ -148,7 +161,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) { txn->writes_size = INITIAL_WRITES_SIZE; } - // aquire the read version for txn. + // aquire the read version for txn. must be careful to avoid a race do { txn->rv = version_; @@ -177,7 +190,7 @@ void txn_abort (txn_t *txn) { int i; for (i = 0; i < txn->writes_count; ++i) { update_rec_t *update = (update_rec_t *)txn->writes[i].rec; - update->version = TAG_VALUE(0); + update->version = ABORTED_VERSION; } nbd_defer_free(txn->writes); @@ -191,13 +204,26 @@ txn_state_e txn_commit (txn_t *txn) { txn_state_e state = txn_validate(txn); // Detach from its updates. - uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv; + uint64_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; int i; for (i = 0; i < txn->writes_count; ++i) { update_rec_t *update = (update_rec_t *)txn->writes[i].rec; update->version = wv; } + /* + // Lower the reference count for 's read version + uint64_t temp = 1; + uint64_t old_count; + do { + old_count = temp; + temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1); + } while (old_count != temp); + if (old_count == 0 && version_ != txn->rv) { + map_remove(active_, (void *)txn->rv); + } + */ + nbd_defer_free(txn->writes); nbd_defer_free(txn); @@ -207,34 +233,48 @@ txn_state_e txn_commit (txn_t *txn) { // Get most recent committed version prior to our read version. uint64_t tm_get (txn_t *txn, void *key) { - // Iterate through update records associated with to find the latest committed version. - // We can use the first matching version. Older updates always come later in the list. + // Iterate through update records associated with to find the latest committed version prior to our + // read version. update_rec_t *update = (update_rec_t *) map_get(txn->map, key); - for (; update != NULL; update = update->prev) { - uint64_t writer_version = update->version; - if (writer_version < txn->rv) - return update->value; - - // If the version is tagged, it means that it is not a version number, but a pointer to an - // in progress transaction. - if (IS_TAGGED(update->version)) { - txn_t *writer = (txn_t *)STRIP_TAG(writer_version); - - if (writer == txn) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; - - // Skip updates from aborted transactions. - txn_state_e writer_state = writer->state; - if (EXPECT_FALSE(writer_state == TXN_ABORTED)) - continue; + for (; update != NULL; update = update->next) { - if (writer_state == TXN_VALIDATING) { - writer_state = txn_validate(writer); - } + // If the update's version is not tagged it means the update is committed. + if (!IS_TAGGED(update->version)) { + if (update->version <= txn->rv) + return update->value; + continue; + } + + // If the update's version is tagged then either the update was aborted or the the version number is + // actually a pointer to a running transaction's txn_t. - if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + // Skip updates from aborted transactions. + if (EXPECT_FALSE(update->version == ABORTED_VERSION)) + continue; + + // The update's transaction is still in progress. Access its txn_t. + txn_t *writer = (txn_t *)STRIP_TAG(update->version); + if (writer == txn) // found our own update + return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + + txn_state_e writer_state = writer->state; + if (writer_state == TXN_RUNNING) + continue; + + if (writer_state == TXN_VALIDATING) { + if (writer->wv > txn->rv) + continue; + writer_state = txn_validate(writer); } + + // Skip updates from aborted transactions. + if (writer_state == TXN_ABORTED) + continue; + + assert(writer_state == TXN_VALIDATED); + if (writer->wv > txn->rv) + continue; + return update->value; } return DOES_NOT_EXIST; } @@ -250,8 +290,8 @@ void tm_set (txn_t *txn, void *key, uint64_t value) { // push the new update record onto 's update list uint64_t update_prev; do { - update->prev = (update_rec_t *) map_get(txn->map, key); - update_prev = (uint64_t)update->prev; + update->next = (update_rec_t *) map_get(txn->map, key); + update_prev = (uint64_t)update->next; } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev); // add to the write set for commit-time validation -- 2.40.0