From a1d0b3ca99552878b1becf561d8f3291992aaa67 Mon Sep 17 00:00:00 2001 From: jdybnis Date: Mon, 29 Dec 2008 04:48:45 +0000 Subject: [PATCH] port to 32 bit x86 linux --- include/runtime.h | 1 + include/tls.h | 6 +- include/txn.h | 2 - makefile | 6 +- map/hashtable.c | 31 ++++++-- map/list.c | 7 +- map/skiplist.c | 14 ++-- runtime/lwt.c | 6 +- runtime/mem.c | 1 + runtime/rlocal.h | 3 +- runtime/runtime.c | 4 +- test/haz_test.c | 2 +- test/rcu_test.c | 15 +++- test/txn_test.c | 2 +- txn/txn.c | 186 +++++++++++++++++++++++++++++----------------- 15 files changed, 184 insertions(+), 102 deletions(-) diff --git a/include/runtime.h b/include/runtime.h index 5a87c88..c9db783 100644 --- a/include/runtime.h +++ b/include/runtime.h @@ -5,6 +5,7 @@ #ifndef RUNTIME_H #define RUNTIME_H +#include #include "tls.h" extern DECLARE_THREAD_LOCAL(tid_, int); diff --git a/include/tls.h b/include/tls.h index 865e6da..5f3d0e1 100644 --- a/include/tls.h +++ b/include/tls.h @@ -9,9 +9,9 @@ #define TLS_H #ifdef __ELF__ // use gcc thread-local storage (i.e. __thread variables) -#define DECLARE_THREAD_LOCAL (name, type) type name -#define INIT_THREAD_LOCAL (name, value) name = value -#define SET_THREAD_LOCAL (name, value) name = value +#define DECLARE_THREAD_LOCAL(name, type) __thread type name +#define INIT_THREAD_LOCAL(name) +#define SET_THREAD_LOCAL(name, value) name = value #define LOCALIZE_THREAD_LOCAL(name, type) #else//!__ELF__ diff --git a/include/txn.h b/include/txn.h index 7cc40d0..a196cd3 100644 --- a/include/txn.h +++ b/include/txn.h @@ -11,8 +11,6 @@ typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_sta typedef struct txn txn_t; -void txn_init (void); - txn_t * txn_begin (map_t *map); void txn_abort (txn_t *txn); txn_state_e txn_commit (txn_t *txn); diff --git a/makefile b/makefile index ebcaae6..8b0738b 100644 --- a/makefile +++ b/makefile @@ -4,10 +4,10 @@ ################################################################################################### # Makefile for building programs with whole-program interfile optimization ################################################################################################### -OPT := -fwhole-program -combine -03 #-DNDEBUG -CFLAGS := -g -Wall -Werror -std=c99 $(OPT) -m64 #-DLIST_USE_HAZARD_POINTER -DENABLE_TRACE #-DTEST_STRING_KEYS #-DNBD32 +OPT := -O3 #-DNDEBUG #-fwhole-program -combine +CFLAGS := -g -Wall -Werror -std=c99 $(OPT) -lpthread #-DNBD32 -DENABLE_TRACE #-DLIST_USE_HAZARD_POINTER #-DTEST_STRING_KEYS # INCS := $(addprefix -I, include) -TESTS := output/map_test2 output/map_test1 output/txn_test +TESTS := output/map_test2 output/map_test1 output/txn_test output/rcu_test output/haz_test EXES := $(TESTS) RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c runtime/hazard.c diff --git a/map/hashtable.c b/map/hashtable.c index ba62692..88bf631 100644 --- a/map/hashtable.c +++ b/map/hashtable.c @@ -95,7 +95,7 @@ static volatile entry_t *hti_lookup (hti_t *hti, map_key_t key, uint32_t key_has map_key_t ent_key = ent->key; if (ent_key == DOES_NOT_EXIST) { TRACE("h1", "hti_lookup: entry %p for key %p is empty", ent, - (hti->ht->key_type == NULL) ? (void *)ent_key : GET_PTR(ent_key)); + (hti->ht->key_type == NULL) ? (void *)key : GET_PTR(key)); *is_empty = 1; // indicate an empty so the caller avoids an expensive key compare return ent; } @@ -233,9 +233,11 @@ static int hti_copy_entry (hti_t *ht1, volatile entry_t *ht1_ent, uint32_t key_h // We use 0 to indicate that is uninitiallized. Occasionally the key's hash will really be 0 and we // waste time recomputing it every time. It is rare enough that it won't hurt performance. if (key_hash == 0) { - key_hash = (ht1->ht->key_type == NULL) - ? murmur32_8b(ht1_ent_key) - : ht1->ht->key_type->hash((void *)key); +#ifdef NBD32 + key_hash = (ht1->ht->key_type == NULL) ? murmur32_4b(ht1_ent_key) : ht1->ht->key_type->hash((void *)key); +#else + key_hash = (ht1->ht->key_type == NULL) ? murmur32_8b(ht1_ent_key) : ht1->ht->key_type->hash((void *)key); +#endif } int ht2_ent_is_empty; @@ -442,7 +444,11 @@ static map_val_t hti_get (hti_t *hti, map_key_t key, uint32_t key_hash) { // map_val_t ht_get (hashtable_t *ht, map_key_t key) { +#ifdef NBD32 + uint32_t hash = (ht->key_type == NULL) ? murmur32_4b((uint64_t)key) : ht->key_type->hash((void *)key); +#else uint32_t hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash((void *)key); +#endif return hti_get(ht->hti, key, hash); } @@ -538,7 +544,11 @@ map_val_t ht_cas (hashtable_t *ht, map_key_t key, map_val_t expected_val, map_va } map_val_t old_val; +#ifdef NBD32 + uint32_t key_hash = (ht->key_type == NULL) ? murmur32_4b((uint64_t)key) : ht->key_type->hash((void *)key); +#else uint32_t key_hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash((void *)key); +#endif while ((old_val = hti_cas(hti, key, key_hash, expected_val, new_val)) == COPIED_VALUE) { assert(hti->next); hti = hti->next; @@ -552,7 +562,11 @@ map_val_t ht_cas (hashtable_t *ht, map_key_t key, map_val_t expected_val, map_va map_val_t ht_remove (hashtable_t *ht, map_key_t key) { hti_t *hti = ht->hti; map_val_t val; +#ifdef NBD32 + uint32_t key_hash = (ht->key_type == NULL) ? murmur32_4b((uint64_t)key) : ht->key_type->hash((void *)key); +#else uint32_t key_hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash((void *)key); +#endif do { val = hti_cas(hti, key, key_hash, CAS_EXPECT_WHATEVER, DOES_NOT_EXIST); if (val != COPIED_VALUE) @@ -653,9 +667,12 @@ map_val_t ht_iter_next (ht_iter_t *iter, map_key_t *key_ptr) { *key_ptr = key; } if (val == COPIED_VALUE) { - uint32_t hash = (iter->hti->ht->key_type == NULL) - ? murmur32_8b((uint64_t)key) - : iter->hti->ht->key_type->hash((void *)key); + const datatype_t *key_type = iter->hti->ht->key_type; +#ifdef NBD32 + uint32_t hash = (key_type == NULL) ? murmur32_4b((uint64_t)key) : key_type->hash((void *)key); +#else + uint32_t hash = (key_type == NULL) ? murmur32_8b((uint64_t)key) : key_type->hash((void *)key); +#endif val = hti_get(iter->hti->next, (map_key_t)ent->key, hash); } diff --git a/map/list.c b/map/list.c index 0d9c11f..bc191cd 100644 --- a/map/list.c +++ b/map/list.c @@ -34,13 +34,14 @@ struct ll { }; // Marking the field of a node logically removes it from the list -#define MARK_NODE(x) TAG_VALUE((markable_t)(x), TAG1) -#define HAS_MARK(x) (IS_TAGGED((x), TAG1) == TAG1) +#define MARK_NODE(x) TAG_VALUE((markable_t)(x), 0x1) +#define HAS_MARK(x) (IS_TAGGED((x), 0x1) == 0x1) #define GET_NODE(x) ((node_t *)(x)) -#define STRIP_MARK(x) ((node_t *)STRIP_TAG((x), TAG1)) +#define STRIP_MARK(x) ((node_t *)STRIP_TAG((x), 0x1)) static node_t *node_alloc (map_key_t key, map_val_t val) { node_t *item = (node_t *)nbd_malloc(sizeof(node_t)); + assert(!HAS_MARK((size_t)item)); item->key = key; item->val = val; return item; diff --git a/map/skiplist.c b/map/skiplist.c index 9887a43..6e02e12 100644 --- a/map/skiplist.c +++ b/map/skiplist.c @@ -47,15 +47,15 @@ struct sl { // Marking the field of a node logically removes it from the list #if 0 -static inline markable_t MARK_NODE(node_t * x) { return TAG_VALUE((markable_t)x, TAG1); } -static inline int HAS_MARK(markable_t x) { return (IS_TAGGED(x, TAG1) == TAG1); } +static inline markable_t MARK_NODE(node_t * x) { return TAG_VALUE((markable_t)x, 0x1); } +static inline int HAS_MARK(markable_t x) { return (IS_TAGGED(x, 0x1) == 0x1); } static inline node_t * GET_NODE(markable_t x) { assert(!HAS_MARK(x)); return (node_t *)x; } -static inline node_t * STRIP_MARK(markable_t x) { return ((node_t *)STRIP_TAG(x, TAG1)); } +static inline node_t * STRIP_MARK(markable_t x) { return ((node_t *)STRIP_TAG(x, 0x1)); } #else -#define MARK_NODE(x) TAG_VALUE((markable_t)(x), TAG1) -#define HAS_MARK(x) (IS_TAGGED((x), TAG1) == TAG1) +#define MARK_NODE(x) TAG_VALUE((markable_t)(x), 0x1) +#define HAS_MARK(x) (IS_TAGGED((x), 0x1) == 0x1) #define GET_NODE(x) ((node_t *)(x)) -#define STRIP_MARK(x) ((node_t *)STRIP_TAG((x), TAG1)) +#define STRIP_MARK(x) ((node_t *)STRIP_TAG((x), 0x1)) #endif static int random_level (void) { @@ -117,7 +117,7 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl node_t *pred = sl->head; node_t *item = NULL; TRACE("s2", "find_preds: searching for key %p in skiplist (head is %p)", key, pred); - int d; + int d = 0; int start_level = MAX_LEVEL; #if MAX_LEVEL > 2 // Optimization for small lists. No need to traverse empty higher levels. diff --git a/runtime/lwt.c b/runtime/lwt.c index 0c3147f..c25fd95 100644 --- a/runtime/lwt.c +++ b/runtime/lwt.c @@ -19,8 +19,8 @@ volatile int halt_ = 0; typedef struct lwt_record { uint64_t timestamp; uint64_t format; - uint64_t value1; - uint64_t value2; + size_t value1; + size_t value2; } lwt_record_t; typedef struct lwt_buffer { @@ -51,7 +51,7 @@ void lwt_set_trace_level (const char *flags) } } -static inline void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) +static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) { // print the record if its trace category is enabled at a high enough level int flag = r->format >> 56; diff --git a/runtime/mem.c b/runtime/mem.c index 1787a62..f8cecec 100644 --- a/runtime/mem.c +++ b/runtime/mem.c @@ -4,6 +4,7 @@ * * Extreamly fast multi-threaded malloc. 64 bit platforms only! */ +#define _BSD_SOURCE // so we get MAP_ANON on linux #include #include #include diff --git a/runtime/rlocal.h b/runtime/rlocal.h index 4828c6e..1c727bb 100644 --- a/runtime/rlocal.h +++ b/runtime/rlocal.h @@ -1,7 +1,8 @@ #ifndef RLOCAL_H #define RLOCAL_H + +#include "runtime.h" #include "tls.h" -DECLARE_THREAD_LOCAL(tid_, int); void mem_init (void); diff --git a/runtime/runtime.c b/runtime/runtime.c index 14a09f9..ceb6772 100644 --- a/runtime/runtime.c +++ b/runtime/runtime.c @@ -2,6 +2,8 @@ * Written by Josh Dybnis and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ +#define _POSIX_C_SOURCE 1 // for rand_r() +#include #include #include "common.h" #include "runtime.h" @@ -19,7 +21,7 @@ typedef struct thread_info { } thread_info_t; __attribute__ ((constructor)) void nbd_init (void) { - sranddev(); + //sranddev(); INIT_THREAD_LOCAL(rand_seed_); INIT_THREAD_LOCAL(tid_); SET_THREAD_LOCAL(tid_, 0); diff --git a/test/haz_test.c b/test/haz_test.c index 9e6f1f2..3b9092a 100644 --- a/test/haz_test.c +++ b/test/haz_test.c @@ -77,7 +77,7 @@ void *worker (void *arg) { int main (int argc, char **argv) { //lwt_set_trace_level("m0r0"); - int num_threads = MAX_NUM_THREADS; + int num_threads = 2; if (argc == 2) { errno = 0; diff --git a/test/rcu_test.c b/test/rcu_test.c index 96d196b..d469648 100644 --- a/test/rcu_test.c +++ b/test/rcu_test.c @@ -1,9 +1,12 @@ +#define _POSIX_C_SOURCE 1 // for rand_r #include #include #include +#include #include "common.h" #include "runtime.h" #include "mem.h" +#include "rcu.h" #define NUM_ITERATIONS 10000000 @@ -65,7 +68,7 @@ void *worker (void *arg) { } else { node_t *x = lifo_aba_pop(stk_); if (x) { - nbd_defer_free(x); + rcu_defer_free(x); } } rcu_update(); @@ -75,7 +78,6 @@ void *worker (void *arg) { } int main (int argc, char **argv) { - nbd_init(); //lwt_set_trace_level("m0r0"); int num_threads = 2; @@ -96,6 +98,10 @@ int main (int argc, char **argv) { stk_ = lifo_alloc(); wait_ = num_threads; + struct timeval tv1, tv2; + gettimeofday(&tv1, NULL); + wait_ = num_threads; + pthread_t thread[num_threads]; for (int i = 0; i < num_threads; ++i) { int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i); @@ -105,5 +111,10 @@ int main (int argc, char **argv) { pthread_join(thread[i], NULL); } + gettimeofday(&tv2, NULL); + int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000; + printf("Th:%d Time:%dms\n\n", num_threads, ms); + fflush(stdout); + return 0; } diff --git a/test/txn_test.c b/test/txn_test.c index 8ad8096..651e1c0 100644 --- a/test/txn_test.c +++ b/test/txn_test.c @@ -26,7 +26,7 @@ void test1 (CuTest* tc) { int main (void) { - txn_init(); + lwt_set_trace_level("x3h3"); CuString *output = CuStringNew(); CuSuite* suite = CuSuiteNew(); diff --git a/txn/txn.c b/txn/txn.c index 1cc8e2f..a932f66 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -1,4 +1,4 @@ -/* +/* * Written by Josh Dybnis and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ @@ -6,24 +6,27 @@ #include "txn.h" #include "mem.h" #include "rcu.h" +#include "lwt.h" #include "skiplist.h" #define UNDETERMINED_VERSION 0 #define ABORTED_VERSION TAG_VALUE(0, TAG1) #define INITIAL_WRITES_SIZE 4 +#define PTR_TO_VAL(x) ((size_t)(x) >> 2) +#define VAL_TO_PTR(x) ((update_t *)((x) << 2)) typedef struct update_rec update_t; typedef map_key_t version_t; struct update_rec { - version_t version; + version_t version; // tagged versions are txn_t pointers, untagged are actual version numbers map_val_t value; map_val_t next; // an earlier update }; typedef struct write_rec { map_key_t key; - update_t *rec; + update_t *rec; } write_rec_t; struct txn { @@ -43,35 +46,35 @@ static version_t version_ = 1; static skiplist_t *active_ = NULL; -void txn_init (void) { +__attribute__ ((constructor)) void txn_init (void) { active_ = sl_alloc(NULL); } -// Validate the updates for . Validation fails if there is a write-write conflict. That is if after our +// Validate the updates for . Validation fails if there is a write-write conflict. That is if after our // read version another transaction committed a change to an entry we are also trying to change. // -// If we encounter a potential conflict with a transaction that is in the process of validating, we help it +// If we encounter a potential conflict with a transaction that is in the process of validating, we help it // complete validating. It must be finished before we can decide to rollback or commit. // static txn_state_e validate_key (txn_t *txn, map_key_t key) { assert(txn->state != TXN_RUNNING); - + map_val_t val = map_get(txn->map, key); update_t *update = NULL; for (; val != DOES_NOT_EXIST; val = update->next) { // If the update or its version is not tagged it means the update is committed. // - // We can stop at the first committed record we find that is at least as old as our read version. All - // the other committed records following it will be older. And all the uncommitted records following it + // We can stop at the first committed record we find that is at least as old as our read version. All + // the other committed records following it will be older. And all the uncommitted records following it // will eventually conflict with it and abort. if (!IS_TAGGED(val, TAG2)) return TXN_VALIDATED; - update = (update_t *)STRIP_TAG(val, TAG2); - if (!IS_TAGGED(update->version, TAG1)) + update = VAL_TO_PTR(val); + if (!IS_TAGGED(update->version, TAG1)) return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; - // If the update's version is tagged then either the update was aborted or the the version number is + // If the update's version is tagged then either the update was aborted or the the version number is // actually a pointer to a running transaction's txn_t. // Skip aborted transactions. @@ -79,20 +82,20 @@ static txn_state_e validate_key (txn_t *txn, map_key_t key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)VAL_TO_PTR(update->version); if (writer == txn) continue; // Skip our own updates. txn_state_e writer_state = writer->state; - // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its + // Any running transaction will only be able to acquire a wv greater than ours. A transaction changes its // state to validating before aquiring a wv. We can ignore an unvalidated transaction if its version is - // greater than ours. See next comment below for why. + // greater than ours. See the next comment below for the explination why. if (writer_state == TXN_RUNNING) - continue; - + continue; + // If has a later version than us we can safely ignore its updates. It will not commit until - // we have completed validation (in order to remain non-blocking it will help us validate if necessary). - // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging + // we have completed validation (in order to remain non-blocking it will help us validate if necessary). + // This protocol ensures a deterministic resolution to every conflict and avoids infinite ping-ponging // between validating two conflicting transactions. if (writer_state == TXN_VALIDATING) { if (writer->wv > txn->wv) @@ -148,13 +151,16 @@ static txn_state_e txn_validate (txn_t *txn) { return txn->state; } -static update_t *alloc_update_rec (void) { +static update_t *alloc_update_rec (version_t ver, map_val_t val) { update_t *u = (update_t *)nbd_malloc(sizeof(update_t)); - memset(u, 0, sizeof(update_t)); + u->version = ver; + u->value = val; + u->next = DOES_NOT_EXIST; return u; } txn_t *txn_begin (map_t *map) { + TRACE("x1", "txn_begin: map %p", map, 0); txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t)); memset(txn, 0, sizeof(txn_t)); txn->wv = UNDETERMINED_VERSION; @@ -184,12 +190,13 @@ txn_t *txn_begin (map_t *map) { } while (temp != old_count); } while (1); + TRACE("x1", "txn_begin: returning new transaction %p (read version %p)", txn, txn->rv); return txn; } void txn_abort (txn_t *txn) { if (txn->state != TXN_RUNNING) - return; // TODO: return some sort of error code + return; int i; for (i = 0; i < txn->writes_count; ++i) { @@ -203,7 +210,7 @@ void txn_abort (txn_t *txn) { txn_state_e txn_commit (txn_t *txn) { if (txn->state != TXN_RUNNING) - return txn->state; // TODO: return some sort of error code + return txn->state; assert(txn->state == TXN_RUNNING); txn->state = TXN_VALIDATING; @@ -213,7 +220,7 @@ txn_state_e txn_commit (txn_t *txn) { version_t wv = (txn->state == TXN_ABORTED) ? ABORTED_VERSION : txn->wv; int i; for (i = 0; i < txn->writes_count; ++i) { - update_t *update = (update_t *)txn->writes[i].rec; + update_t *update = txn->writes[i].rec; update->version = wv; } @@ -237,100 +244,132 @@ txn_state_e txn_commit (txn_t *txn) { // Get most recent committed version prior to our read version. map_val_t txn_map_get (txn_t *txn, map_key_t key) { - if (txn->state != TXN_RUNNING) + TRACE("x1", "txn_map_get: txn %p map %p", txn, txn->map); + TRACE("x1", "txn_map_get: key %p", key, 0); + + if (txn->state != TXN_RUNNING) { + TRACE("x1", "txn_map_get: error txn not running (state %p)", txn->state, 0); return ERROR_TXN_NOT_RUNNING; + } - // Iterate through the update records to find the latest committed version prior to our read version. + // Iterate through the update records to find the latest committed version prior to our read version. map_val_t newest_val = map_get(txn->map, key); map_val_t val = newest_val; - update_t *update = NULL; - for ( ; ; val = update->next) { - - if (!IS_TAGGED(val, TAG2)) + update_t *update; + for ( ; (update = VAL_TO_PTR(val)) != NULL ; val = update->next) { + + // If TAG2 is set in it indicates that is an update record. Otherwise all the following are + // true: is a literal value, it is older than any currently active transaction, and it is the most + // recently set value for its key. Therefore it is visible to . + if (!IS_TAGGED(val, TAG2)) { + TRACE("x1", "txn_map_get: found untagged value; returning %p", val, 0); return val; - - update = (update_t *)STRIP_TAG(val, TAG2); - assert(update != NULL); + } // If the update's version is not tagged it means the update is committed. if (!IS_TAGGED(update->version, TAG1)) { - if (update->version <= txn->rv) + if (update->version <= txn->rv) { + TRACE("x2", "txn_map_get: found committed update %p (version %p)", update, update->version); break; // success + } + TRACE("x2", "txn_map_get: skipping update %p (version %p)", update, update->version); continue; } - // If the update's version is tagged then either the update was aborted or the the version number is + // If the update's version is tagged then either the update was aborted or the the version number is // actually a pointer to a running transaction's txn_t. // Skip updates from aborted transactions. - if (EXPECT_FALSE(update->version == ABORTED_VERSION)) + if (EXPECT_FALSE(update->version == ABORTED_VERSION)) { + TRACE("x2", "txn_map_get: skipping aborted update %p", update, 0); continue; + } // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); - if (writer == txn) // found our own update - break; // success + txn_t *writer = (txn_t *)VAL_TO_PTR(update->version); + if (writer == txn) { + TRACE("x2", "txn_map_get: found txn's own update %p", update, 0); + break; // success + } txn_state_e writer_state = writer->state; - if (writer_state == TXN_RUNNING) - continue; + if (writer_state == TXN_RUNNING) { + TRACE("x2", "txn_map_get: skipping update %p of in-progress transaction %p", update, writer); + continue; + } if (writer_state == TXN_VALIDATING) { + TRACE("x2", "txn_map_get: update %p transaction %p validating", update, writer); if (writer->wv > txn->rv) continue; writer_state = txn_validate(writer); } // Skip updates from aborted transactions. - if (writer_state == TXN_ABORTED) + if (writer_state == TXN_ABORTED) { + TRACE("x2", "txn_map_get: skipping aborted update %p", update, 0); continue; + } assert(writer_state == TXN_VALIDATED); - if (writer->wv > txn->rv) + if (writer->wv > txn->rv) { + TRACE("x2", "txn_map_get: skipping update %p (version %p)", update, update->version); continue; + } break; // success } + if (update == NULL) { + TRACE("x1", "txn_map_get: key does not exist in map", key, 0); + return DOES_NOT_EXIST; + } + map_val_t value = update->value; + TRACE("x1", "txn_map_get: key found returning value %p", value, 0); + return value; // collect some garbage version_t min_active_version = UNDETERMINED_VERSION; update_t *next_update = NULL; if (IS_TAGGED(update->next, TAG2)) { - next_update = (update_t *)STRIP_TAG(update->next, TAG2); + next_update = VAL_TO_PTR(update->next); + + // If (and all update records following it [execpt if it is aborted]) is old enough + // that it is not visible to any active transaction we can safely free it. min_active_version = (version_t)sl_min_key(active_); if (next_update->version < min_active_version) { - // (and all update records following it [execpt if it is aborted]) is old enough that it is - // not visible to any active transaction. We can safely free it. - // Skip over aborted versions to look for more recent updates + // If the is aborted, skip over it to look for more recent ones that may follow update_t *temp = next_update; while (temp->version == ABORTED_VERSION) { assert(!IS_TAGGED(temp->version, TAG1)); - map_val_t next = next_update->next; + map_val_t next = temp->next; if (!IS_TAGGED(next, TAG2)) break; - temp = (update_t *)STRIP_TAG(next, TAG2); - if (temp->version >= min_active_version) + // Bail out of garbage collection if we find a record that might still be accessed by an + // ongoing transaction. + if (VAL_TO_PTR(next)->version >= min_active_version) return value; + + temp = VAL_TO_PTR(next); } - // free and all the update records following it + // free the next update record and all the ones following it temp = next_update; - while (1) { - map_val_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); + map_val_t next; + do { + next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); // if we find ourself in a race just back off and let the other thread take care of it - if (next == DOES_NOT_EXIST) + if (next == DOES_NOT_EXIST) return value; - if (!IS_TAGGED(next, TAG2)) - break; + nbd_free(temp); - temp = (update_t *)STRIP_TAG(next, TAG2); - nbd_free(update); - } + temp = VAL_TO_PTR(next); + + } while (IS_TAGGED(next, TAG2)); } } @@ -346,25 +385,36 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) { } } } - + return value; } void txn_map_set (txn_t *txn, map_key_t key, map_val_t value) { - if (txn->state != TXN_RUNNING) - return; // TODO: return some sort of error code + TRACE("x1", "txn_map_set: txn %p map %p", txn, txn->map); + TRACE("x1", "txn_map_set: key %p value %p", key, value); + assert(!IS_TAGGED(value, TAG1) && !IS_TAGGED(value, TAG2)); + + if (txn->state != TXN_RUNNING) { + TRACE("x1", "txn_map_set: error txn not running (state %p)", txn->state, 0); + return; + } // create a new update record - update_t *update = alloc_update_rec(); - update->value = value; - update->version = TAG_VALUE((version_t)txn, TAG1); + version_t ver = TAG_VALUE(PTR_TO_VAL(txn), TAG1); // tagged versions are txn_t pointers + update_t *update = alloc_update_rec(ver, value); // push the new update record onto 's update list - map_val_t old_update; + map_val_t old_update = map_get(txn->map, key); + TRACE("x2", "txn_map_set: old update %p new update record %p", old_update, update); do { - old_update = map_get(txn->map, key); update->next = old_update; - } while (map_cas(txn->map, key, old_update, TAG_VALUE((map_val_t)update, TAG2)) != old_update); + map_val_t temp = map_cas(txn->map, key, old_update, TAG_VALUE(PTR_TO_VAL(update), TAG2)); + if (temp == old_update) + break; + + TRACE("x1", "txn_map_set: cas failed; found %p expected %p", temp, old_update); + old_update = temp; + } while (1); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) { -- 2.40.0