From fa60fb1fb78136edc9e5863c0b86222f572963ef Mon Sep 17 00:00:00 2001 From: jdybnis Date: Mon, 1 Dec 2008 02:20:21 +0000 Subject: [PATCH] clean up transaction interface --- include/txn.h | 14 ++++- makefile | 4 +- test/map_test2.c | 116 +++++++++++++++++------------------ test/txn_test.c | 31 ++++++++++ todo | 11 ++-- txn/txn.c | 154 +++++++++++++++++++++++------------------------ 6 files changed, 185 insertions(+), 145 deletions(-) create mode 100644 test/txn_test.c diff --git a/include/txn.h b/include/txn.h index c23c59b..51db400 100644 --- a/include/txn.h +++ b/include/txn.h @@ -4,12 +4,20 @@ */ #ifndef TXN_H #define TXN_H + #include "map.h" -typedef enum { TXN_READ_WRITE, TXN_READ_ONLY, TXN_BLIND_WRITE } txn_access_t; -typedef enum { TXN_DIRTY_READ, TXN_READ_COMMITTED, TXN_REPEATABLE_READ } txn_isolation_t; +typedef enum { TXN_READ_WRITE, TXN_READ_ONLY, TXN_BLIND_WRITE } txn_access_e; +typedef enum { TXN_REPEATABLE_READ, TXN_READ_COMMITTED, TXN_DIRTY_READ } txn_isolation_e; +typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_e; typedef struct txn txn_t; -txn_t *txn_begin (txn_access_t access, txn_isolation_t isolation, map_t *map); +txn_t * txn_begin (txn_access_e access, txn_isolation_e isolation, map_type_t map_type); +void txn_abort (txn_t *txn); +txn_state_e txn_commit (txn_t *txn); + +uint64_t tm_get (txn_t *txn, const char *key, uint32_t key_len); +void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value); + #endif//TXN_H diff --git a/makefile b/makefile index 6baa518..edd6735 100644 --- a/makefile +++ b/makefile @@ -7,14 +7,14 @@ OPT := -fwhole-program -combine -03 #-DNDEBUG CFLAGS := -g -Wall -Werror -std=c99 -m64 $(OPT) #-DENABLE_TRACE INCS := $(addprefix -I, include) -TESTS := output/map_test1 output/map_test2 output/rcu_test +TESTS := output/map_test1 output/map_test2 output/rcu_test output/txn_test EXES := $(TESTS) RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c MAP_SRCS := map/map.c map/nstring.c map/list.c map/skiplist.c map/hashtable.c rcu_test_SRCS := $(RUNTIME_SRCS) test/rcu_test.c -txn_test_SRCS := $(RUNTIME_SRCS) $(MAP_SRCS) txn/txn.c +txn_test_SRCS := $(RUNTIME_SRCS) $(MAP_SRCS) test/txn_test.c test/CuTest.c txn/txn.c map_test1_SRCS := $(RUNTIME_SRCS) $(MAP_SRCS) test/map_test1.c map_test2_SRCS := $(RUNTIME_SRCS) $(MAP_SRCS) test/map_test2.c test/CuTest.c diff --git a/test/map_test2.c b/test/map_test2.c index 7d31c87..b321a68 100644 --- a/test/map_test2.c +++ b/test/map_test2.c @@ -4,11 +4,12 @@ */ #include #include -#include "runtime.h" + #include "CuTest.h" + #include "common.h" +#include "runtime.h" #include "map.h" -#include "mem.h" #include "lwt.h" #define ASSERT_EQUAL(x, y) CuAssertIntEquals(tc, x, y) @@ -16,7 +17,7 @@ typedef struct worker_data { int id; CuTest *tc; - map_t *ht; + map_t *map; int *wait; } worker_data_t; @@ -25,56 +26,56 @@ static map_type_t map_type_; // Test some basic stuff; add a few keys, remove a few keys void basic_test (CuTest* tc) { - map_t *ht = map_alloc(map_type_); - - ASSERT_EQUAL( 0, map_count(ht) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_add(ht,"a",2,10) ); - ASSERT_EQUAL( 1, map_count(ht) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_add(ht,"b",2,20) ); - ASSERT_EQUAL( 2, map_count(ht) ); - ASSERT_EQUAL( 20, map_get(ht,"b",2) ); - ASSERT_EQUAL( 10, map_set(ht,"a",2,11) ); - ASSERT_EQUAL( 20, map_set(ht,"b",2,21) ); - ASSERT_EQUAL( 2, map_count(ht) ); - ASSERT_EQUAL( 21, map_add(ht,"b",2,22) ); - ASSERT_EQUAL( 11, map_remove(ht,"a",2) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_get(ht,"a",2) ); - ASSERT_EQUAL( 1, map_count(ht) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_remove(ht,"a",2) ); - ASSERT_EQUAL( 21, map_remove(ht,"b",2) ); - ASSERT_EQUAL( 0, map_count(ht) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_remove(ht,"b",2) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_remove(ht,"c",2) ); - ASSERT_EQUAL( 0, map_count(ht) ); + map_t *map = map_alloc(map_type_); + + ASSERT_EQUAL( 0, map_count(map) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_add(map,"a",2,10) ); + ASSERT_EQUAL( 1, map_count(map) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_add(map,"b",2,20) ); + ASSERT_EQUAL( 2, map_count(map) ); + ASSERT_EQUAL( 20, map_get(map,"b",2) ); + ASSERT_EQUAL( 10, map_set(map,"a",2,11) ); + ASSERT_EQUAL( 20, map_set(map,"b",2,21) ); + ASSERT_EQUAL( 2, map_count(map) ); + ASSERT_EQUAL( 21, map_add(map,"b",2,22) ); + ASSERT_EQUAL( 11, map_remove(map,"a",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_get(map,"a",2) ); + ASSERT_EQUAL( 1, map_count(map) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_remove(map,"a",2) ); + ASSERT_EQUAL( 21, map_remove(map,"b",2) ); + ASSERT_EQUAL( 0, map_count(map) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_remove(map,"b",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_remove(map,"c",2) ); + ASSERT_EQUAL( 0, map_count(map) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_add(ht,"d",2,40) ); - ASSERT_EQUAL( 40, map_get(ht,"d",2) ); - ASSERT_EQUAL( 1, map_count(ht) ); - ASSERT_EQUAL( 40, map_remove(ht,"d",2) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_get(ht,"d",2) ); - ASSERT_EQUAL( 0, map_count(ht) ); - - ASSERT_EQUAL( DOES_NOT_EXIST, map_replace(ht,"d",2,10) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_get(ht,"d",2) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_set(ht,"d",2,40) ); - ASSERT_EQUAL( 40, map_replace(ht,"d",2,41) ); - ASSERT_EQUAL( 41, map_get(ht,"d",2) ); - ASSERT_EQUAL( 41, map_remove(ht,"d",2) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_get(ht,"d",2) ); - ASSERT_EQUAL( 0, map_count(ht) ); - - ASSERT_EQUAL( DOES_NOT_EXIST, map_replace(ht,"b",2,20) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_get(ht,"b",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_add(map,"d",2,40) ); + ASSERT_EQUAL( 40, map_get(map,"d",2) ); + ASSERT_EQUAL( 1, map_count(map) ); + ASSERT_EQUAL( 40, map_remove(map,"d",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_get(map,"d",2) ); + ASSERT_EQUAL( 0, map_count(map) ); + + ASSERT_EQUAL( DOES_NOT_EXIST, map_replace(map,"d",2,10) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_get(map,"d",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_set(map,"d",2,40) ); + ASSERT_EQUAL( 40, map_replace(map,"d",2,41) ); + ASSERT_EQUAL( 41, map_get(map,"d",2) ); + ASSERT_EQUAL( 41, map_remove(map,"d",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_get(map,"d",2) ); + ASSERT_EQUAL( 0, map_count(map) ); + + ASSERT_EQUAL( DOES_NOT_EXIST, map_replace(map,"b",2,20) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_get(map,"b",2) ); // In the end, all entries should be removed - ASSERT_EQUAL( DOES_NOT_EXIST, map_set(ht,"b",2,20) ); - ASSERT_EQUAL( 20, map_replace(ht,"b",2,21) ); - ASSERT_EQUAL( 21, map_get(ht,"b",2) ); - ASSERT_EQUAL( 21, map_remove(ht,"b",2) ); - ASSERT_EQUAL( DOES_NOT_EXIST, map_get(ht,"b",2) ); - ASSERT_EQUAL( 0, map_count(ht) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_set(map,"b",2,20) ); + ASSERT_EQUAL( 20, map_replace(map,"b",2,21) ); + ASSERT_EQUAL( 21, map_get(map,"b",2) ); + ASSERT_EQUAL( 21, map_remove(map,"b",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, map_get(map,"b",2) ); + ASSERT_EQUAL( 0, map_count(map) ); - map_free(ht); + map_free(map); // In a quiecent state; it is safe to free. rcu_update(); @@ -82,7 +83,7 @@ void basic_test (CuTest* tc) { void *simple_worker (void *arg) { worker_data_t *wd = (worker_data_t *)arg; - map_t *ht = wd->ht; + map_t *map = wd->map; CuTest* tc = wd->tc; uint64_t d = wd->id; int iters = map_type_ == MAP_TYPE_LIST ? 100 : 1000000; @@ -95,14 +96,14 @@ void *simple_worker (void *arg) { char key[10]; sprintf(key, "k%u", i); TRACE("t0", "test map_add() iteration (%llu, %llu)", j, i); - CuAssertIntEquals_Msg(tc, key, DOES_NOT_EXIST, map_add(ht, key, strlen(key)+1, d+1) ); + CuAssertIntEquals_Msg(tc, key, DOES_NOT_EXIST, map_add(map, key, strlen(key)+1, d+1) ); rcu_update(); } for (int i = d; i < iters; i+=2) { char key[10]; sprintf(key, "k%u", i); TRACE("t0", "test map_remove() iteration (%llu, %llu)", j, i); - CuAssertIntEquals_Msg(tc, key, d+1, map_remove(ht, key, strlen(key)+1) ); + CuAssertIntEquals_Msg(tc, key, d+1, map_remove(map, key, strlen(key)+1) ); rcu_update(); } } @@ -115,14 +116,14 @@ void simple_add_remove (CuTest* tc) { pthread_t thread[2]; worker_data_t wd[2]; int wait = 2; - map_t *ht = map_alloc(map_type_); + map_t *map = map_alloc(map_type_); // In 2 threads, add & remove even & odd elements concurrently int i; for (i = 0; i < 2; ++i) { wd[i].id = i; wd[i].tc = tc; - wd[i].ht = ht; + wd[i].map = map; wd[i].wait = &wait; int rc = nbd_thread_create(thread + i, i, simple_worker, wd + i); if (rc != 0) { perror("nbd_thread_create"); return; } @@ -132,17 +133,17 @@ void simple_add_remove (CuTest* tc) { } // In the end, all members should be removed - ASSERT_EQUAL( 0, map_count(ht) ); + ASSERT_EQUAL( 0, map_count(map) ); // In a quiecent state; it is safe to free. - map_free(ht); + map_free(map); } void *inserter_worker (void *arg) { //pthread_t thread[NUM_THREADS]; - //map_t *ht = map_alloc(MAP_TYPE_HASHTABLE); + //map_t *map = map_alloc(map_type_); return NULL; } @@ -167,7 +168,6 @@ int main (void) { SUITE_ADD_TEST(suite, simple_add_remove); CuSuiteRun(suite); - CuSuiteSummary(suite, output); CuSuiteDetails(suite, output); printf("%s\n", output->buffer); } diff --git a/test/txn_test.c b/test/txn_test.c new file mode 100644 index 0000000..e20027e --- /dev/null +++ b/test/txn_test.c @@ -0,0 +1,31 @@ +#include +#include "CuTest.h" + +#include "common.h" +#include "runtime.h" +#include "txn.h" + +#define ASSERT_EQUAL(x, y) CuAssertIntEquals(tc, x, y) + +void test1 (CuTest* tc) { + txn_t *tm = txn_begin(TXN_READ_WRITE, TXN_REPEATABLE_READ, MAP_TYPE_LIST); + tm_set(tm, "abc", 4, 2); + tm_set(tm, "abc", 4, 3); + ASSERT_EQUAL( 3, tm_get(tm, "abc", 4) ); + ASSERT_EQUAL( TXN_VALIDATED, txn_commit(tm)); +} + +int main (void) { + + nbd_init(); + + CuString *output = CuStringNew(); + CuSuite* suite = CuSuiteNew(); + SUITE_ADD_TEST(suite, test1); + CuSuiteRun(suite); + CuSuiteDetails(suite, output); + printf("%s\n", output->buffer); + + return 0; +} + diff --git a/todo b/todo index 372b4f7..e052f1c 100644 --- a/todo +++ b/todo @@ -1,10 +1,13 @@ -- make rcu wait when its buffer gets full, instead of crashing -- fix makefile to compute dependency info as a side-effect of compilation (-MF) +- make rcu wait when its buffer gets full, instead of throwing an assert ++ fix makefile to compute dependency info as a side-effect of compilation (-MF) - investigate 16 byte CAS; ht can store GUIDs inline instead of pointers to actual keys -- test ht +- testing, testing, testing +- support integer keys for ht +- validate arguments to interface functions + optimize tracing code, still too much overhead + use NULL instead of a sentinal node in skiplist and list -- make interfaces for all data structures consistent ++ make the interfaces for all data structures consistent + make list and skiplist use string keys + optimize integer keys - ht_print() +- iterators diff --git a/txn/txn.c b/txn/txn.c index b0b6174..1073f7b 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -9,7 +9,6 @@ #define UNDETERMINED_VERSION 0 #define INITIAL_WRITES_SIZE 4 -typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t; typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t; typedef struct update_rec update_rec_t; @@ -29,83 +28,25 @@ typedef struct write_rec { struct txn { uint64_t rv; uint64_t wv; - hashtable_t *ht; + map_t *map; write_rec_t *writes; uint32_t writes_size; uint32_t writes_count; uint32_t writes_scan; - txn_access_t access; - txn_isolation_t isolation; - txn_state_t state; + txn_access_e access; + txn_isolation_e isolation; + txn_state_e state; }; -uint64_t GlobalVersion = 1; -uint64_t MinActiveTxnVersion = 0; +static txn_state_e txn_validate (txn_t *txn); -static txn_state_t txn_validate (txn_t *txn); - -update_rec_t *alloc_update_rec (void) { - update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t)); - memset(u, 0, sizeof(update_rec_t)); - return u; -} - -txn_t *txn_begin (txn_access_t access, txn_isolation_t isolation, hashtable_t *ht) { - txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); - memset(txn, 0, sizeof(txn_t)); - txn->access = access; - txn->isolation = isolation; - txn->rv = GlobalVersion; - txn->wv = UNDETERMINED_VERSION; - txn->state = TXN_RUNNING; - txn->ht = ht; - if (isolation != TXN_READ_ONLY) { - txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); - txn->writes_size = INITIAL_WRITES_SIZE; - } - return txn; -} - -// Get most recent committed version prior to our read version. -int64_t txn_ht_get (txn_t *txn, const char *key, uint32_t key_len) { - - // Iterate through update records associated with to find the latest committed version. - // We can use the first matching version. Older updates always come later in the list. - update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len); - for (; update != NULL; update = update->prev) { - uint64_t writer_version = update->version; - if (writer_version < txn->rv) - return update->value; - - // If the version is tagged, it means that it is not a version number, but a pointer to an - // in progress transaction. - if (IS_TAGGED(update->version)) { - txn_t *writer = (txn_t *)STRIP_TAG(writer_version); - - if (writer == txn) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; - - // Skip updates from aborted transactions. - txn_state_t writer_state = writer->state; - if (EXPECT_FALSE(writer_state == TXN_ABORTED)) - continue; - - if (writer_state == TXN_VALIDATING) { - writer_state = txn_validate(writer); - } - - if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION) - return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; - } - } - return DOES_NOT_EXIST; -} +static uint64_t version_ = 1; // Validate the updates for . Validation fails for a key we have written to if there is a // write committed newer than our read version. -static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t key_len) { +static txn_state_e tm_validate_key (txn_t *txn, const char *key, uint32_t key_len) { - update_rec_t *update = (update_rec_t *) ht_get(txn->ht, key, key_len); + update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len); for (; update != NULL; update = update->prev) { uint64_t writer_version = update->version; if (writer_version <= txn->rv) @@ -127,7 +68,7 @@ static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t ke if (writer_version <= txn->rv && writer_version != UNDETERMINED_VERSION) return TXN_VALIDATED; - txn_state_t writer_state = writer->state; + txn_state_e writer_state = writer->state; if (EXPECT_FALSE(writer_state == TXN_ABORTED)) continue; @@ -150,18 +91,18 @@ static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key, uint32_t ke return TXN_VALIDATED; } -static txn_state_t txn_validate (txn_t *txn) { +static txn_state_e txn_validate (txn_t *txn) { int i; switch (txn->state) { case TXN_VALIDATING: if (txn->wv == UNDETERMINED_VERSION) { - uint64_t wv = SYNC_ADD(&GlobalVersion, 1); + uint64_t wv = SYNC_ADD(&version_, 1); SYNC_CAS(&txn->wv, UNDETERMINED_VERSION, wv); } for (i = 0; i < txn->writes_count; ++i) { - txn_state_t s = txn_ht_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key)); + txn_state_e s = tm_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key)); if (s == TXN_ABORTED) { txn->state = TXN_ABORTED; break; @@ -183,6 +124,28 @@ static txn_state_t txn_validate (txn_t *txn) { return txn->state; } +static update_rec_t *alloc_update_rec (void) { + update_rec_t *u = (update_rec_t *)nbd_malloc(sizeof(update_rec_t)); + memset(u, 0, sizeof(update_rec_t)); + return u; +} + +txn_t *txn_begin (txn_access_e access, txn_isolation_e isolation, map_type_t map_type) { + txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); + memset(txn, 0, sizeof(txn_t)); + txn->access = access; + txn->isolation = isolation; + txn->rv = version_; + txn->wv = UNDETERMINED_VERSION; + txn->state = TXN_RUNNING; + txn->map = map_alloc(map_type); + if (isolation != TXN_READ_ONLY) { + txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE); + txn->writes_size = INITIAL_WRITES_SIZE; + } + return txn; +} + void txn_abort (txn_t *txn) { int i; @@ -195,11 +158,11 @@ void txn_abort (txn_t *txn) { nbd_defer_free(txn); } -txn_state_t txn_commit (txn_t *txn) { +txn_state_e txn_commit (txn_t *txn) { assert(txn->state == TXN_RUNNING); txn->state = TXN_VALIDATING; - txn_state_t state = txn_validate(txn); + txn_state_e state = txn_validate(txn); // Detach from its updates. uint64_t wv = (txn->state == TXN_ABORTED) ? TAG_VALUE(0) : txn->wv; @@ -215,7 +178,42 @@ txn_state_t txn_commit (txn_t *txn) { return state; } -void txn_ht_put (txn_t *txn, const char *key, uint32_t key_len, int64_t value) { +// Get most recent committed version prior to our read version. +uint64_t tm_get (txn_t *txn, const char *key, uint32_t key_len) { + + // Iterate through update records associated with to find the latest committed version. + // We can use the first matching version. Older updates always come later in the list. + update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len); + for (; update != NULL; update = update->prev) { + uint64_t writer_version = update->version; + if (writer_version < txn->rv) + return update->value; + + // If the version is tagged, it means that it is not a version number, but a pointer to an + // in progress transaction. + if (IS_TAGGED(update->version)) { + txn_t *writer = (txn_t *)STRIP_TAG(writer_version); + + if (writer == txn) + return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + + // Skip updates from aborted transactions. + txn_state_e writer_state = writer->state; + if (EXPECT_FALSE(writer_state == TXN_ABORTED)) + continue; + + if (writer_state == TXN_VALIDATING) { + writer_state = txn_validate(writer); + } + + if (writer_state == TXN_VALIDATED && writer->wv <= txn->rv && writer->wv != UNDETERMINED_VERSION) + return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value; + } + } + return DOES_NOT_EXIST; +} + +void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value) { // create a new update record update_rec_t *update = alloc_update_rec(); @@ -224,11 +222,11 @@ void txn_ht_put (txn_t *txn, const char *key, uint32_t key_len, int64_t value) { update->version = TAG_VALUE((uint64_t)txn); // push the new update record onto 's update list - int64_t update_prev; + uint64_t update_prev; do { - update->prev = (update_rec_t *) ht_get(txn->ht, key, key_len); - update_prev = (int64_t)update->prev; - } while (ht_compare_and_set(txn->ht, key, key_len, update_prev, (int64_t)update) != update_prev); + update->prev = (update_rec_t *) map_get(txn->map, key, key_len); + update_prev = (uint64_t)update->prev; + } while (map_cas(txn->map, key, key_len, update_prev, (uint64_t)update) != update_prev); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) { -- 2.40.0