From a03cf3b0c40e6c3b8b4877b49a64288cb3fcb919 Mon Sep 17 00:00:00 2001 From: jdybnis Date: Mon, 22 Dec 2008 00:52:20 +0000 Subject: [PATCH] add hazard pointer implementation. buggy --- include/common.h | 4 -- include/hazard.h | 25 +++++++ include/mem.h | 1 - include/runtime.h | 12 ++-- makefile | 7 +- map/hashtable.c | 7 +- map/list.c | 99 +++++++++++++++++++++------- map/skiplist.c | 11 ++-- runtime/hazard.c | 164 ++++++++++++++++++++++++++++++++++++++++++++++ runtime/mem.c | 10 ++- runtime/rcu.c | 8 ++- runtime/runtime.c | 2 +- test/haz_test.c | 117 +++++++++++++++++++++++++++++++++ test/map_test1.c | 4 +- test/map_test2.c | 6 +- test/txn_test.c | 1 - todo | 6 +- txn/txn.c | 11 ++-- 18 files changed, 431 insertions(+), 64 deletions(-) create mode 100644 include/hazard.h create mode 100644 runtime/hazard.c create mode 100644 test/haz_test.c diff --git a/include/common.h b/include/common.h index a4fc7e3..c644bb5 100644 --- a/include/common.h +++ b/include/common.h @@ -11,10 +11,6 @@ #include #include -#define malloc "DON'T USE MALLOC" // use nbd_malloc() instead -#define calloc "DON'T USE CALLOC" // use nbd_malloc() instead -#define free "DON'T USE FREE" // use nbd_free() instead - #define MAX_NUM_THREADS 4 // make this whatever you want, but make it a power of 2 #define CACHE_LINE_SIZE 64 diff --git a/include/hazard.h b/include/hazard.h new file mode 100644 index 0000000..938a3bd --- /dev/null +++ b/include/hazard.h @@ -0,0 +1,25 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * hazard pointers + * + * www.research.ibm.com/people/m/michael/ieeetpds-2004.pdf + * + */ +#ifndef HAZARD_H +#define HAZARD_H + +#define STATIC_HAZ_PER_THREAD 2 + +typedef void (*free_t) (void *); +typedef void *haz_t; + +static inline void haz_set (haz_t *haz, void *x) { *haz = x; __asm__ __volatile__("mfence"); } + +haz_t *haz_get_static (int n); +void haz_register_dynamic (haz_t *haz); +void haz_unregister_dynamic (haz_t *haz); +void haz_defer_free (void *p, free_t f); + +#endif diff --git a/include/mem.h b/include/mem.h index 3e1d940..a8a0de4 100644 --- a/include/mem.h +++ b/include/mem.h @@ -6,5 +6,4 @@ #define MEM_H void *nbd_malloc (size_t n); void nbd_free (void *x); -void nbd_defer_free (void *x); #endif//MEM_H diff --git a/include/runtime.h b/include/runtime.h index 9ba7657..5a87c88 100644 --- a/include/runtime.h +++ b/include/runtime.h @@ -2,12 +2,14 @@ * Written by Josh Dybnis and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ -#ifndef THREADS_H -#define THREADS_H +#ifndef RUNTIME_H +#define RUNTIME_H + +#include "tls.h" + +extern DECLARE_THREAD_LOCAL(tid_, int); -void nbd_init (void); int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg); int nbd_rand (void); -void rcu_update (void); -#endif//THREADS_H +#endif//RUNTIME_H diff --git a/makefile b/makefile index 9b71cf7..ebcaae6 100644 --- a/makefile +++ b/makefile @@ -5,14 +5,15 @@ # Makefile for building programs with whole-program interfile optimization ################################################################################################### OPT := -fwhole-program -combine -03 #-DNDEBUG -CFLAGS := -g -Wall -Werror -std=c99 $(OPT) -m64 -DTEST_STRING_KEYS #-DNBD32 #-DENABLE_TRACE +CFLAGS := -g -Wall -Werror -std=c99 $(OPT) -m64 #-DLIST_USE_HAZARD_POINTER -DENABLE_TRACE #-DTEST_STRING_KEYS #-DNBD32 INCS := $(addprefix -I, include) -TESTS := output/map_test1 output/map_test2 output/txn_test +TESTS := output/map_test2 output/map_test1 output/txn_test EXES := $(TESTS) -RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c +RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c runtime/hazard.c MAP_SRCS := map/map.c map/list.c map/skiplist.c map/hashtable.c +haz_test_SRCS := $(RUNTIME_SRCS) test/haz_test.c rcu_test_SRCS := $(RUNTIME_SRCS) test/rcu_test.c txn_test_SRCS := $(RUNTIME_SRCS) $(MAP_SRCS) test/txn_test.c test/CuTest.c txn/txn.c map_test1_SRCS := $(RUNTIME_SRCS) $(MAP_SRCS) test/map_test1.c diff --git a/map/hashtable.c b/map/hashtable.c index a991646..ba62692 100644 --- a/map/hashtable.c +++ b/map/hashtable.c @@ -16,6 +16,7 @@ #include "common.h" #include "murmur.h" #include "mem.h" +#include "rcu.h" #include "hashtable.h" #ifndef NBD32 @@ -498,11 +499,11 @@ static void hti_defer_free (hti_t *hti) { continue; assert(!IS_TAGGED(val, TAG1) || val == TAG_VALUE(TOMBSTONE, TAG1)); // copy not in progress if (hti->ht->key_type != NULL && key != DOES_NOT_EXIST) { - nbd_defer_free(GET_PTR(key)); + rcu_defer_free(GET_PTR(key)); } } - nbd_defer_free((void *)hti->table); - nbd_defer_free(hti); + rcu_defer_free((void *)hti->table); + rcu_defer_free(hti); } static void hti_release (hti_t *hti) { diff --git a/map/list.c b/map/list.c index f91b418..0d9c11f 100644 --- a/map/list.c +++ b/map/list.c @@ -12,6 +12,11 @@ #include "common.h" #include "list.h" #include "mem.h" +#ifdef LIST_USE_HAZARD_POINTER +#include "hazard.h" +#else +#include "rcu.h" +#endif typedef struct node { map_key_t key; @@ -20,7 +25,7 @@ typedef struct node { } node_t; struct ll_iter { - node_t *next; + node_t *pred; }; struct ll { @@ -53,6 +58,9 @@ void ll_free (list_t *ll) { node_t *item = STRIP_MARK(ll->head->next); while (item != NULL) { node_t *next = STRIP_MARK(item->next); + if (ll->key_type != NULL) { + nbd_free((void *)item->key); + } nbd_free(item); item = next; } @@ -70,12 +78,27 @@ size_t ll_count (list_t *ll) { return count; } +#ifdef LIST_USE_HAZARD_POINTER +static void nbd_free_node (node_t *x) { + nbd_free((void *)x->key); + nbd_free(x); +} +#endif + static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_t key, int help_remove) { node_t *pred = ll->head; node_t *item = GET_NODE(pred->next); TRACE("l2", "find_pred: searching for key %p in list (head is %p)", key, pred); +#ifdef LIST_USE_HAZARD_POINTER + haz_t *temp, *hp0 = haz_get_static(0), *hp1 = haz_get_static(1); +#endif while (item != NULL) { +#ifdef LIST_USE_HAZARD_POINTER + haz_set(hp0, item); + if (STRIP_MARK(pred->next) != item) + return find_pred(pred_ptr, item_ptr, ll, key, help_remove); // retry +#endif markable_t next = item->next; // A mark means the node is logically removed but not physically unlinked yet. @@ -102,11 +125,15 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_ TRACE("l3", "find_pred: now current item is %p next is %p", item, next); // The thread that completes the unlink should free the memory. - node_t *unlinked = GET_NODE(other); +#ifdef LIST_USE_HAZARD_POINTER + free_t free_ = (ll->key_type != NULL ? (free_t)nbd_free_node : nbd_free); + haz_defer_free(GET_NODE(other), free_); +#else if (ll->key_type != NULL) { - nbd_defer_free((void *)unlinked->key); + rcu_defer_free((void *)GET_NODE(other)->key); } - nbd_defer_free(unlinked); + rcu_defer_free(GET_NODE(other)); +#endif } else { TRACE("l2", "find_pred: lost a race to unlink item %p from pred %p", item, pred); TRACE("l2", "find_pred: pred's link changed to %p", other, 0); @@ -135,7 +162,9 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_ if (pred_ptr != NULL) { *pred_ptr = pred; } - *item_ptr = item; + if (item_ptr != NULL) { + *item_ptr = item; + } if (d == 0) { TRACE("l2", "find_pred: found matching item %p in list, pred is %p", item, pred); return TRUE; @@ -145,6 +174,9 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_ } pred = item; +#ifdef LIST_USE_HAZARD_POINTER + temp = hp0; hp0 = hp1; hp1 = temp; +#endif item = GET_NODE(next); } @@ -283,10 +315,15 @@ map_val_t ll_remove (list_t *ll, map_key_t key) { } // The thread that completes the unlink should free the memory. +#ifdef LIST_USE_HAZARD_POINTER + free_t free_ = (ll->key_type != NULL ? (free_t)nbd_free_node : nbd_free); + haz_defer_free(GET_NODE(item), free_); +#else if (ll->key_type != NULL) { - nbd_defer_free((void *)item->key); + rcu_defer_free((void *)item->key); } - nbd_defer_free(item); + rcu_defer_free(item); +#endif TRACE("l1", "ll_remove: successfully unlinked item %p from the list", item, 0); return val; } @@ -295,13 +332,10 @@ void ll_print (list_t *ll) { markable_t next = ll->head->next; int i = 0; while (next != DOES_NOT_EXIST) { - if (HAS_MARK(next)) { - printf("*"); - } node_t *item = STRIP_MARK(next); if (item == NULL) break; - printf("%p:0x%llx ", item, (uint64_t)item->key); + printf("%s%p:0x%llx ", HAS_MARK(item->next) ? "*" : "", item, (uint64_t)item->key); fflush(stdout); if (i++ > 30) { printf("..."); @@ -315,30 +349,49 @@ void ll_print (list_t *ll) { ll_iter_t *ll_iter_begin (list_t *ll, map_key_t key) { ll_iter_t *iter = (ll_iter_t *)nbd_malloc(sizeof(ll_iter_t)); if (key != DOES_NOT_EXIST) { - find_pred(NULL, &iter->next, ll, key, FALSE); + find_pred(&iter->pred, NULL, ll, key, FALSE); } else { - iter->next = GET_NODE(ll->head->next); + iter->pred = ll->head; } +#ifdef LIST_USE_HAZARD_POINTER + haz_register_dynamic((void **)&iter->pred); +#endif return iter; } map_val_t ll_iter_next (ll_iter_t *iter, map_key_t *key_ptr) { assert(iter); - node_t *item = iter->next; - while (item != NULL && HAS_MARK(item->next)) { - item = STRIP_MARK(item->next); - } - if (item == NULL) { - iter->next = NULL; + if (iter->pred == NULL) return DOES_NOT_EXIST; - } - iter->next = STRIP_MARK(item->next); + + // advance iterator to next item; skip items that have been removed + markable_t item; +#ifdef LIST_USE_HAZARD_POINTER + haz_t *hp0 = haz_get_static(0); +#endif + do { +#ifndef LIST_USE_HAZARD_POINTER + item = iter->pred->next; +#else //LIST_USE_HAZARD_POINTER + do { + item = iter->pred->next; + haz_set(hp0, STRIP_MARK(item)); + } while (item != ((volatile node_t *)iter->pred)->next); +#endif//LIST_USE_HAZARD_POINTER + iter->pred = STRIP_MARK(item); + if (iter->pred == NULL) + return DOES_NOT_EXIST; + } while (HAS_MARK(item)); + if (key_ptr != NULL) { - *key_ptr = item->key; + *key_ptr = GET_NODE(item)->key; } - return item->val; + return GET_NODE(item)->val; } void ll_iter_free (ll_iter_t *iter) { +#ifdef LIST_USE_HAZARD_POINTER + haz_unregister_dynamic((void **)&iter->pred); +#endif nbd_free(iter); } diff --git a/map/skiplist.c b/map/skiplist.c index e9c9b55..9887a43 100644 --- a/map/skiplist.c +++ b/map/skiplist.c @@ -21,9 +21,10 @@ #include #include "common.h" -#include "runtime.h" #include "skiplist.h" +#include "runtime.h" #include "mem.h" +#include "rcu.h" // Setting MAX_LEVEL to 0 essentially makes this data structure the Harris-Michael lock-free list (in list.c). #define MAX_LEVEL 31 @@ -170,9 +171,9 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl if (level == 0) { node_t *unlinked = GET_NODE(other); if (sl->key_type != NULL) { - nbd_defer_free((void *)unlinked->key); + rcu_defer_free((void *)unlinked->key); } - nbd_defer_free(unlinked); + rcu_defer_free(unlinked); } } else { TRACE("s3", "find_preds: lost race to unlink item %p from pred %p", item, pred); @@ -438,9 +439,9 @@ map_val_t sl_remove (skiplist_t *sl, map_key_t key) { TRACE("s2", "sl_remove: unlinked item %p from the skiplist at level 0", item, 0); // The thread that completes the unlink should free the memory. if (sl->key_type != NULL) { - nbd_defer_free((void *)item->key); + rcu_defer_free((void *)item->key); } - nbd_defer_free(item); + rcu_defer_free(item); } return val; } diff --git a/runtime/hazard.c b/runtime/hazard.c new file mode 100644 index 0000000..72ed0c1 --- /dev/null +++ b/runtime/hazard.c @@ -0,0 +1,164 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * hazard pointers + * + * www.research.ibm.com/people/m/michael/ieeetpds-2004.pdf + * + */ +#include "common.h" +#include "lwt.h" +#include "mem.h" +#include "tls.h" +#include "runtime.h" +#include "hazard.h" + +typedef struct pending { + void * ptr; + free_t free_; +} pending_t; + +typedef struct haz_local { + pending_t *pending; // to be freed + int pending_size; + int pending_count; + + haz_t static_haz[STATIC_HAZ_PER_THREAD]; + + haz_t **dynamic; + int dynamic_size; + int dynamic_count; + +} haz_local_t; + +static haz_local_t haz_local_[MAX_NUM_THREADS] = {}; + +static void sort_hazards (haz_t *hazards, int n) { + return; +} + +static int search_hazards (void *p, haz_t *hazards, int n) { + for (int i = 0; i < n; ++i) { + if (hazards[i] == p) + return TRUE; + } + return FALSE; +} + +static void resize_pending (void) { + LOCALIZE_THREAD_LOCAL(tid_, int); + haz_local_t *l = haz_local_ + tid_; + pending_t *p = nbd_malloc(sizeof(pending_t) * l->pending_size * 2); + memcpy(p, l->pending, l->pending_size); + nbd_free(l->pending); + l->pending = p; + l->pending_size *= 2; +} + +void haz_defer_free (void *d, free_t f) { + assert(d); + assert(f); + LOCALIZE_THREAD_LOCAL(tid_, int); + haz_local_t *l = haz_local_ + tid_; + while (l->pending_count == l->pending_size) { + + if (l->pending_size == 0) { + l->pending_size = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD; + l->pending = nbd_malloc(sizeof(pending_t) * l->pending_size); + break; + } + + // scan for hazard pointers + haz_t *hazards = nbd_malloc(sizeof(haz_t) * l->pending_size); + int hazard_count = 0; + for (int i = 0; i < MAX_NUM_THREADS; ++i) { + haz_local_t *h = haz_local_ + i; + for (int j = 0; j < STATIC_HAZ_PER_THREAD; ++j) { + if (h->static_haz[j] != NULL) { + if (hazard_count == l->pending_size) { + resize_pending(); + nbd_free(hazards); + haz_defer_free(d, f); + return; + } + hazards[hazard_count++] = h->static_haz[j]; + } + } + for (int j = 0; j < h->dynamic_count; ++j) { + if (h->dynamic[j] != NULL && *h->dynamic[j] != NULL) { + if (hazard_count == l->pending_size) { + resize_pending(); + nbd_free(hazards); + haz_defer_free(d, f); + return; + } + hazards[hazard_count++] = *h->dynamic[j]; + } + } + } + sort_hazards(hazards, hazard_count); + + // check for conflicts + int conflicts_count = 0; + for (int i = 0; i < l->pending_count; ++i) { + pending_t *p = l->pending + i; + if (search_hazards(p->ptr, hazards, hazard_count)) { + l->pending[conflicts_count++] = *p; // put conflicts back on the pending list + } else { + assert(p->free_); + assert(p->ptr); + p->free_(p->ptr); // free pending item + } + } + l->pending_count = conflicts_count; + nbd_free(hazards); + } + l->pending[ l->pending_count ].ptr = d; + l->pending[ l->pending_count ].free_ = f; + l->pending_count++; +} + +haz_t *haz_get_static (int i) { + if (i >= STATIC_HAZ_PER_THREAD) + return NULL; + LOCALIZE_THREAD_LOCAL(tid_, int); + return &haz_local_[tid_].static_haz[i]; +} + +void haz_register_dynamic (haz_t *haz) { + LOCALIZE_THREAD_LOCAL(tid_, int); + haz_local_t *l = haz_local_ + tid_; + + if (l->dynamic_size == 0) { + l->dynamic_size = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD; + l->dynamic = nbd_malloc(sizeof(haz_t *) * l->dynamic_size); + } + + if (l->dynamic_count == l->dynamic_size) { + haz_t **d = nbd_malloc(sizeof(haz_t *) * l->dynamic_size * 2); + memcpy(d, l->dynamic, l->dynamic_size); + nbd_free(l->dynamic); + l->dynamic = d; + l->dynamic_size *= 2; + } + + l->dynamic[ l->dynamic_count++ ] = haz; +} + +// assumes was registered in the same thread +void haz_unregister_dynamic (void **haz) { + LOCALIZE_THREAD_LOCAL(tid_, int); + haz_local_t *l = haz_local_ + tid_; + + for (int i = 0; i < l->dynamic_count; ++i) { + if (l->dynamic[i] == haz) { + if (i != l->dynamic_count - 1) { + l->dynamic[i] = l->dynamic[ l->dynamic_count ]; + } + l->dynamic_count--; + return; + } + } + assert(0); +} diff --git a/runtime/mem.c b/runtime/mem.c index 0b7cd5b..1787a62 100644 --- a/runtime/mem.c +++ b/runtime/mem.c @@ -11,7 +11,7 @@ #include "rlocal.h" #include "lwt.h" -#define GET_SCALE(n) (sizeof(n)*8-__builtin_clzl((n)-1)) // log2 of , rounded up +#define GET_SCALE(n) (sizeof(void *)*__CHAR_BIT__ - __builtin_clzl((n) - 1)) // log2 of , rounded up #define MAX_SCALE 31 // allocate blocks up to 4GB in size (arbitrary, could be bigger) #define REGION_SCALE 22 // 4MB regions #define REGION_SIZE (1 << REGION_SCALE) @@ -67,6 +67,9 @@ void nbd_free (void *x) { block_t *b = (block_t *)x; assert(((size_t)b >> REGION_SCALE) < ((1 << HEADER_REGION_SCALE) / sizeof(header_t))); header_t *h = region_header_ + ((size_t)b >> REGION_SCALE); +#ifndef NDEBUG + memset(b, 0xcd, (1 << h->scale)); +#endif TRACE("m0", "nbd_free(): block %p scale %llu", b, h->scale); if (h->owner == tid_) { TRACE("m0", "nbd_free(): private block, free list head %p", @@ -76,8 +79,9 @@ void nbd_free (void *x) { } else { TRACE("m0", "nbd_free(): owner %llu free list head %p", h->owner, pub_free_list_[h->owner][h->scale][tid_]); - b->next = pub_free_list_[h->owner][h->scale][tid_]; - pub_free_list_[h->owner][h->scale][tid_] = b; + do { + b->next = pub_free_list_[h->owner][h->scale][tid_]; + } while (SYNC_CAS(&pub_free_list_[h->owner][h->scale][tid_], b->next, b) != b->next); } } diff --git a/runtime/rcu.c b/runtime/rcu.c index 5ae851b..50205e8 100644 --- a/runtime/rcu.c +++ b/runtime/rcu.c @@ -12,6 +12,7 @@ #include "lwt.h" #include "mem.h" #include "tls.h" +#include "rcu.h" #define RCU_POST_THRESHOLD 10 #define RCU_QUEUE_SCALE 20 @@ -71,17 +72,18 @@ void rcu_update (void) { } } -void nbd_defer_free (void *x) { +void rcu_defer_free (void *x) { + assert(x); LOCALIZE_THREAD_LOCAL(tid_, int); fifo_t *q = pending_[tid_]; assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale)); uint32_t i = MOD_SCALE(q->head++, q->scale); q->x[i] = x; - TRACE("r0", "nbd_defer_free: put %p on queue at position %llu", x, pending_[tid_]->head); + TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, pending_[tid_]->head); if (pending_[tid_]->head - rcu_last_posted_[tid_][tid_] < RCU_POST_THRESHOLD) return; - TRACE("r0", "nbd_defer_free: posting %llu", pending_[tid_]->head, 0); + TRACE("r0", "rcu_defer_free: posting %llu", pending_[tid_]->head, 0); int next_thread_id = (tid_ + 1) % num_threads_; rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head; } diff --git a/runtime/runtime.c b/runtime/runtime.c index d4946cf..14a09f9 100644 --- a/runtime/runtime.c +++ b/runtime/runtime.c @@ -18,7 +18,7 @@ typedef struct thread_info { void *restrict arg; } thread_info_t; -void nbd_init (void) { +__attribute__ ((constructor)) void nbd_init (void) { sranddev(); INIT_THREAD_LOCAL(rand_seed_); INIT_THREAD_LOCAL(tid_); diff --git a/test/haz_test.c b/test/haz_test.c new file mode 100644 index 0000000..9e6f1f2 --- /dev/null +++ b/test/haz_test.c @@ -0,0 +1,117 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * hazard pointer test + * + */ +#include +#include +#include +#include +#include +#include +#include "common.h" +#include "mem.h" +#include "runtime.h" +#include "hazard.h" + +#define NUM_ITERATIONS 10000000 +#define MAX_NUM_THREADS 4 + +typedef struct node { + struct node *next; +} node_t; + +typedef struct lifo { + node_t *head; +} lifo_t; + +static volatile int wait_; +static lifo_t *stk_; + +void *worker (void *arg) { + int id = (int)(size_t)arg; + unsigned int r = (unsigned int)(id + 1) * 0x5bd1e995; // seed "random" number generator + haz_t *hp0 = haz_get_static(0); + + // Wait for all the worker threads to be ready. + __sync_fetch_and_add(&wait_, -1); + do {} while (wait_); + + int i; + for (i = 0; i < NUM_ITERATIONS; ++ i) { + r ^= r << 6; r ^= r >> 21; r ^= r << 7; // generate next "random" number + if (r & 0x1000) { + // push + node_t *new_head = (node_t *)nbd_malloc(sizeof(node_t)); + node_t *old_head = stk_->head; + node_t *temp; + do { + temp = old_head; + new_head->next = temp; + } while ((old_head = __sync_val_compare_and_swap(&stk_->head, temp, new_head)) != temp); + } else { + // pop + node_t *temp; + node_t *head = stk_->head; + do { + temp = head; + if (temp == NULL) + break; + haz_set(hp0, temp); + head = ((volatile lifo_t *)stk_)->head; + if (temp != head) + continue; + } while ((head = __sync_val_compare_and_swap(&stk_->head, temp, temp->next)) != temp); + + if (temp != NULL) { + haz_defer_free(temp, nbd_free); + } + } + } + + return NULL; +} + +int main (int argc, char **argv) { + //lwt_set_trace_level("m0r0"); + + int num_threads = MAX_NUM_THREADS; + if (argc == 2) + { + errno = 0; + num_threads = strtol(argv[1], NULL, 10); + if (errno) { + fprintf(stderr, "%s: Invalid argument for number of threads\n", argv[0]); + return -1; + } + if (num_threads <= 0) { + fprintf(stderr, "%s: Number of threads must be at least 1\n", argv[0]); + return -1; + } + } + + stk_ = (lifo_t *)nbd_malloc(sizeof(lifo_t)); + memset(stk_, 0, sizeof(lifo_t)); + + struct timeval tv1, tv2; + gettimeofday(&tv1, NULL); + wait_ = num_threads; + + pthread_t thread[num_threads]; + for (int i = 0; i < num_threads; ++i) { + int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i); + if (rc != 0) { perror("pthread_create"); return rc; } + } + for (int i = 0; i < num_threads; ++i) { + pthread_join(thread[i], NULL); + } + + gettimeofday(&tv2, NULL); + int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000; + printf("Th:%d Time:%dms\n\n", num_threads, ms); + fflush(stdout); + + return 0; +} diff --git a/test/map_test1.c b/test/map_test1.c index a6a7192..2e9ef85 100644 --- a/test/map_test1.c +++ b/test/map_test1.c @@ -7,6 +7,7 @@ #include "nstring.h" #include "runtime.h" #include "map.h" +#include "rcu.h" #include "list.h" #include "skiplist.h" #include "hashtable.h" @@ -55,7 +56,6 @@ void *worker (void *arg) { } int main (int argc, char **argv) { - nbd_init(); lwt_set_trace_level("r0m0l3"); char* program_name = argv[0]; @@ -66,7 +66,7 @@ int main (int argc, char **argv) { return -1; } - num_threads_ = 1; + num_threads_ = MAX_NUM_THREADS; if (argc == 2) { errno = 0; diff --git a/test/map_test2.c b/test/map_test2.c index f9444ca..9fc6fe3 100644 --- a/test/map_test2.c +++ b/test/map_test2.c @@ -20,6 +20,7 @@ #include "hashtable.h" #include "lwt.h" #include "mem.h" +#include "rcu.h" #define ASSERT_EQUAL(x, y) CuAssertIntEquals(tc, x, y) @@ -316,8 +317,7 @@ void big_iteration_test (CuTest* tc) { int main (void) { - nbd_init(); - lwt_set_trace_level("h3"); + lwt_set_trace_level("l3"); static const map_impl_t *map_types[] = { &ll_map_impl, &sl_map_impl, &ht_map_impl }; for (int i = 0; i < sizeof(map_types)/sizeof(*map_types); ++i) { @@ -327,10 +327,10 @@ int main (void) { CuString *output = CuStringNew(); CuSuite* suite = CuSuiteNew(); + SUITE_ADD_TEST(suite, concurrent_add_remove_test); SUITE_ADD_TEST(suite, basic_test); SUITE_ADD_TEST(suite, basic_iteration_test); SUITE_ADD_TEST(suite, big_iteration_test); - SUITE_ADD_TEST(suite, concurrent_add_remove_test); CuSuiteRun(suite); CuSuiteDetails(suite, output); diff --git a/test/txn_test.c b/test/txn_test.c index 5a7606a..8ad8096 100644 --- a/test/txn_test.c +++ b/test/txn_test.c @@ -26,7 +26,6 @@ void test1 (CuTest* tc) { int main (void) { - nbd_init(); txn_init(); CuString *output = CuStringNew(); diff --git a/todo b/todo index d275543..dbdab12 100644 --- a/todo +++ b/todo @@ -11,13 +11,15 @@ memory manangement ------------------ -- make rcu yield when its buffer gets full instead of throwing an assert +- allow threads to dynamically enter and exit rcu's token ring +- augment rcu with heartbeat manager to kill stalled threads +- make rcu try yielding when its buffer gets full - alternate memory reclamation schemes: hazard pointers and/or reference counting - seperate nbd_malloc/nbd_free into general purpose malloc/free replacement quality ------- -- verify the key management in list, skiplist, and hashtable +- verify the key memory management in list, skiplist, and hashtable - transaction tests - port perf tests from lib-high-scale - characterize the performance of hashtable vs. skiplist vs. list diff --git a/txn/txn.c b/txn/txn.c index 5648d16..1cc8e2f 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -5,6 +5,7 @@ #include "common.h" #include "txn.h" #include "mem.h" +#include "rcu.h" #include "skiplist.h" #define UNDETERMINED_VERSION 0 @@ -196,8 +197,8 @@ void txn_abort (txn_t *txn) { update->version = ABORTED_VERSION; } - nbd_defer_free(txn->writes); - nbd_defer_free(txn); + rcu_defer_free(txn->writes); + rcu_defer_free(txn); } txn_state_e txn_commit (txn_t *txn) { @@ -228,8 +229,8 @@ txn_state_e txn_commit (txn_t *txn) { } } while (old_count != temp); - nbd_defer_free(txn->writes); - nbd_defer_free(txn); + rcu_defer_free(txn->writes); + rcu_defer_free(txn); return state; } @@ -341,7 +342,7 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) { } if (update->version <= min_active_version) { if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) { - nbd_defer_free(update); + rcu_defer_free(update); } } } -- 2.40.0