add hazard pointer implementation. buggy
authorjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Mon, 22 Dec 2008 00:52:20 +0000 (00:52 +0000)
committerjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Mon, 22 Dec 2008 00:52:20 +0000 (00:52 +0000)
18 files changed:
include/common.h
include/hazard.h [new file with mode: 0644]
include/mem.h
include/runtime.h
makefile
map/hashtable.c
map/list.c
map/skiplist.c
runtime/hazard.c [new file with mode: 0644]
runtime/mem.c
runtime/rcu.c
runtime/runtime.c
test/haz_test.c [new file with mode: 0644]
test/map_test1.c
test/map_test2.c
test/txn_test.c
todo
txn/txn.c

index a4fc7e3c0e27459d777821cfe59411e2b97c56a8..c644bb5631f0ab480c0e756f62bb53aa4118f4dd 100644 (file)
 #include <string.h>
 #include <sys/types.h>
 
-#define malloc "DON'T USE MALLOC" // use nbd_malloc() instead
-#define calloc "DON'T USE CALLOC" // use nbd_malloc() instead
-#define free   "DON'T USE FREE"   // use nbd_free() instead
-
 #define MAX_NUM_THREADS 4 // make this whatever you want, but make it a power of 2
 
 #define CACHE_LINE_SIZE 64
diff --git a/include/hazard.h b/include/hazard.h
new file mode 100644 (file)
index 0000000..938a3bd
--- /dev/null
@@ -0,0 +1,25 @@
+/* 
+ * Written by Josh Dybnis and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ *
+ * hazard pointers
+ *
+ * www.research.ibm.com/people/m/michael/ieeetpds-2004.pdf
+ *
+ */
+#ifndef HAZARD_H
+#define HAZARD_H
+
+#define STATIC_HAZ_PER_THREAD 2
+
+typedef void (*free_t) (void *);
+typedef void *haz_t;
+
+static inline void haz_set (haz_t *haz, void *x) { *haz = x; __asm__ __volatile__("mfence"); }
+
+haz_t *haz_get_static         (int n);
+void   haz_register_dynamic   (haz_t *haz);
+void   haz_unregister_dynamic (haz_t *haz);
+void   haz_defer_free         (void *p, free_t f);
+
+#endif
index 3e1d940fa12a776ab0cd4e7abaa64b2059e295ab..a8a0de4c2feaf27ccb822f1ddbd6a138037d3ed0 100644 (file)
@@ -6,5 +6,4 @@
 #define MEM_H
 void *nbd_malloc (size_t n);
 void nbd_free (void *x);
-void nbd_defer_free (void *x);
 #endif//MEM_H
index 9ba7657d92f5d800bcc4381ff98ea177643706f9..5a87c883b248c490096474701be66c2343301c02 100644 (file)
@@ -2,12 +2,14 @@
  * Written by Josh Dybnis and released to the public domain, as explained at
  * http://creativecommons.org/licenses/publicdomain
  */
-#ifndef THREADS_H
-#define THREADS_H
+#ifndef RUNTIME_H
+#define RUNTIME_H
+
+#include "tls.h"
+
+extern DECLARE_THREAD_LOCAL(tid_, int);
 
-void nbd_init (void);
 int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg);
 int nbd_rand (void);
-void rcu_update (void);
 
-#endif//THREADS_H
+#endif//RUNTIME_H
index 9b71cf718dcab10667557e65818ac0c766516d6a..ebcaae63cf06fe3e60eefca8c779fbe5efc0b532 100644 (file)
--- a/makefile
+++ b/makefile
@@ -5,14 +5,15 @@
 # Makefile for building programs with whole-program interfile optimization
 ###################################################################################################
 OPT       := -fwhole-program -combine -03 #-DNDEBUG
-CFLAGS := -g -Wall -Werror -std=c99 $(OPT) -m64 -DTEST_STRING_KEYS #-DNBD32 #-DENABLE_TRACE
+CFLAGS := -g -Wall -Werror -std=c99 $(OPT) -m64 #-DLIST_USE_HAZARD_POINTER -DENABLE_TRACE #-DTEST_STRING_KEYS #-DNBD32 
 INCS   := $(addprefix -I, include)
-TESTS  := output/map_test1 output/map_test2 output/txn_test
+TESTS  := output/map_test2 output/map_test1 output/txn_test 
 EXES   := $(TESTS)
 
-RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c 
+RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c runtime/hazard.c
 MAP_SRCS     := map/map.c map/list.c map/skiplist.c map/hashtable.c
 
+haz_test_SRCS  := $(RUNTIME_SRCS) test/haz_test.c
 rcu_test_SRCS  := $(RUNTIME_SRCS) test/rcu_test.c
 txn_test_SRCS  := $(RUNTIME_SRCS) $(MAP_SRCS) test/txn_test.c test/CuTest.c txn/txn.c
 map_test1_SRCS := $(RUNTIME_SRCS) $(MAP_SRCS) test/map_test1.c 
index a9916465104b196d59647ce926cae5b98296155e..ba62692b8414cefdcbf685c6bab5ded5d1723904 100644 (file)
@@ -16,6 +16,7 @@
 #include "common.h"
 #include "murmur.h"
 #include "mem.h"
+#include "rcu.h"
 #include "hashtable.h"
 
 #ifndef NBD32
@@ -498,11 +499,11 @@ static void hti_defer_free (hti_t *hti) {
             continue;
         assert(!IS_TAGGED(val, TAG1) || val == TAG_VALUE(TOMBSTONE, TAG1)); // copy not in progress
         if (hti->ht->key_type != NULL && key != DOES_NOT_EXIST) {
-            nbd_defer_free(GET_PTR(key));
+            rcu_defer_free(GET_PTR(key));
         }
     }
-    nbd_defer_free((void *)hti->table);
-    nbd_defer_free(hti);
+    rcu_defer_free((void *)hti->table);
+    rcu_defer_free(hti);
 }
 
 static void hti_release (hti_t *hti) {
index f91b4184410bf16cdebdac7fada3ea13232112d3..0d9c11f24f275254c238e9a538f5e8332b3b2729 100644 (file)
 #include "common.h"
 #include "list.h"
 #include "mem.h"
+#ifdef LIST_USE_HAZARD_POINTER
+#include "hazard.h"
+#else
+#include "rcu.h"
+#endif
 
 typedef struct node {
     map_key_t  key;
@@ -20,7 +25,7 @@ typedef struct node {
 } node_t;
 
 struct ll_iter {
-    node_t *next;
+    node_t *pred;
 };
 
 struct ll {
@@ -53,6 +58,9 @@ void ll_free (list_t *ll) {
     node_t *item = STRIP_MARK(ll->head->next);
     while (item != NULL) {
         node_t *next = STRIP_MARK(item->next);
+        if (ll->key_type != NULL) {
+            nbd_free((void *)item->key);
+        }
         nbd_free(item);
         item = next;
     }
@@ -70,12 +78,27 @@ size_t ll_count (list_t *ll) {
     return count;
 }
 
+#ifdef LIST_USE_HAZARD_POINTER
+static void nbd_free_node (node_t *x) {
+    nbd_free((void *)x->key);
+    nbd_free(x);
+}
+#endif
+
 static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_t key, int help_remove) {
     node_t *pred = ll->head;
     node_t *item = GET_NODE(pred->next);
     TRACE("l2", "find_pred: searching for key %p in list (head is %p)", key, pred);
+#ifdef LIST_USE_HAZARD_POINTER
+    haz_t *temp, *hp0 = haz_get_static(0), *hp1 = haz_get_static(1);
+#endif
 
     while (item != NULL) {
+#ifdef LIST_USE_HAZARD_POINTER
+        haz_set(hp0, item);
+        if (STRIP_MARK(pred->next) != item)
+            return find_pred(pred_ptr, item_ptr, ll, key, help_remove); // retry
+#endif
         markable_t next = item->next;
 
         // A mark means the node is logically removed but not physically unlinked yet.
@@ -102,11 +125,15 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_
                 TRACE("l3", "find_pred: now current item is %p next is %p", item, next);
 
                 // The thread that completes the unlink should free the memory.
-                node_t *unlinked = GET_NODE(other);
+#ifdef LIST_USE_HAZARD_POINTER
+                free_t free_ = (ll->key_type != NULL ? (free_t)nbd_free_node : nbd_free);
+                haz_defer_free(GET_NODE(other), free_);
+#else
                 if (ll->key_type != NULL) {
-                    nbd_defer_free((void *)unlinked->key);
+                    rcu_defer_free((void *)GET_NODE(other)->key);
                 }
-                nbd_defer_free(unlinked);
+                rcu_defer_free(GET_NODE(other));
+#endif
             } else {
                 TRACE("l2", "find_pred: lost a race to unlink item %p from pred %p", item, pred);
                 TRACE("l2", "find_pred: pred's link changed to %p", other, 0);
@@ -135,7 +162,9 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_
             if (pred_ptr != NULL) {
                 *pred_ptr = pred;
             }
-            *item_ptr = item;
+            if (item_ptr != NULL) {
+                *item_ptr = item;
+            }
             if (d == 0) {
                 TRACE("l2", "find_pred: found matching item %p in list, pred is %p", item, pred);
                 return TRUE;
@@ -145,6 +174,9 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_
         }
 
         pred = item;
+#ifdef LIST_USE_HAZARD_POINTER
+        temp = hp0; hp0 = hp1; hp1 = temp;
+#endif
         item = GET_NODE(next);
     }
 
@@ -283,10 +315,15 @@ map_val_t ll_remove (list_t *ll, map_key_t key) {
     } 
 
     // The thread that completes the unlink should free the memory.
+#ifdef LIST_USE_HAZARD_POINTER
+    free_t free_ = (ll->key_type != NULL ? (free_t)nbd_free_node : nbd_free);
+    haz_defer_free(GET_NODE(item), free_);
+#else
     if (ll->key_type != NULL) {
-        nbd_defer_free((void *)item->key);
+        rcu_defer_free((void *)item->key);
     }
-    nbd_defer_free(item);
+    rcu_defer_free(item);
+#endif
     TRACE("l1", "ll_remove: successfully unlinked item %p from the list", item, 0);
     return val;
 }
@@ -295,13 +332,10 @@ void ll_print (list_t *ll) {
     markable_t next = ll->head->next;
     int i = 0;
     while (next != DOES_NOT_EXIST) {
-        if (HAS_MARK(next)) {
-            printf("*");
-        }
         node_t *item = STRIP_MARK(next);
         if (item == NULL)
             break;
-        printf("%p:0x%llx ", item, (uint64_t)item->key);
+        printf("%s%p:0x%llx ", HAS_MARK(item->next) ? "*" : "", item, (uint64_t)item->key);
         fflush(stdout);
         if (i++ > 30) {
             printf("...");
@@ -315,30 +349,49 @@ void ll_print (list_t *ll) {
 ll_iter_t *ll_iter_begin (list_t *ll, map_key_t key) {
     ll_iter_t *iter = (ll_iter_t *)nbd_malloc(sizeof(ll_iter_t));
     if (key != DOES_NOT_EXIST) {
-        find_pred(NULL, &iter->next, ll, key, FALSE);
+        find_pred(&iter->pred, NULL, ll, key, FALSE);
     } else {
-        iter->next = GET_NODE(ll->head->next);
+        iter->pred = ll->head;
     }
+#ifdef LIST_USE_HAZARD_POINTER
+    haz_register_dynamic((void **)&iter->pred);
+#endif
     return iter;
 }
 
 map_val_t ll_iter_next (ll_iter_t *iter, map_key_t *key_ptr) {
     assert(iter);
-    node_t *item = iter->next;
-    while (item != NULL && HAS_MARK(item->next)) {
-        item = STRIP_MARK(item->next);
-    }
-    if (item == NULL) {
-        iter->next = NULL;
+    if (iter->pred == NULL)
         return DOES_NOT_EXIST;
-    }
-    iter->next = STRIP_MARK(item->next);
+
+    // advance iterator to next item; skip items that have been removed
+    markable_t item;
+#ifdef LIST_USE_HAZARD_POINTER 
+    haz_t *hp0 = haz_get_static(0);
+#endif
+    do {
+#ifndef LIST_USE_HAZARD_POINTER 
+        item = iter->pred->next;
+#else //LIST_USE_HAZARD_POINTER 
+        do {
+            item = iter->pred->next;
+            haz_set(hp0, STRIP_MARK(item));
+        } while (item != ((volatile node_t *)iter->pred)->next);
+#endif//LIST_USE_HAZARD_POINTER
+        iter->pred = STRIP_MARK(item);
+        if (iter->pred == NULL)
+            return DOES_NOT_EXIST;
+    } while (HAS_MARK(item));
+
     if (key_ptr != NULL) {
-        *key_ptr = item->key;
+        *key_ptr = GET_NODE(item)->key;
     }
-    return item->val;
+    return GET_NODE(item)->val;
 }
 
 void ll_iter_free (ll_iter_t *iter) {
+#ifdef LIST_USE_HAZARD_POINTER
+    haz_unregister_dynamic((void **)&iter->pred);
+#endif
     nbd_free(iter);
 }
index e9c9b55d55a46e14d066a1d7209e6fdb65d23771..9887a43c7198f13e4b9930856289cf4228252968 100644 (file)
 #include <string.h>
 
 #include "common.h"
-#include "runtime.h"
 #include "skiplist.h"
+#include "runtime.h"
 #include "mem.h"
+#include "rcu.h"
 
 // Setting MAX_LEVEL to 0 essentially makes this data structure the Harris-Michael lock-free list (in list.c).
 #define MAX_LEVEL 31
@@ -170,9 +171,9 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl
                     if (level == 0) {
                         node_t *unlinked = GET_NODE(other);
                         if (sl->key_type != NULL) {
-                            nbd_defer_free((void *)unlinked->key);
+                            rcu_defer_free((void *)unlinked->key);
                         }
-                        nbd_defer_free(unlinked);
+                        rcu_defer_free(unlinked);
                     }
                 } else {
                     TRACE("s3", "find_preds: lost race to unlink item %p from pred %p", item, pred);
@@ -438,9 +439,9 @@ map_val_t sl_remove (skiplist_t *sl, map_key_t key) {
         TRACE("s2", "sl_remove: unlinked item %p from the skiplist at level 0", item, 0);
         // The thread that completes the unlink should free the memory.
         if (sl->key_type != NULL) {
-            nbd_defer_free((void *)item->key);
+            rcu_defer_free((void *)item->key);
         }
-        nbd_defer_free(item);
+        rcu_defer_free(item);
     }
     return val;
 }
diff --git a/runtime/hazard.c b/runtime/hazard.c
new file mode 100644 (file)
index 0000000..72ed0c1
--- /dev/null
@@ -0,0 +1,164 @@
+/* 
+ * Written by Josh Dybnis and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ *
+ * hazard pointers
+ *
+ * www.research.ibm.com/people/m/michael/ieeetpds-2004.pdf
+ *
+ */
+#include "common.h"
+#include "lwt.h"
+#include "mem.h"
+#include "tls.h"
+#include "runtime.h"
+#include "hazard.h"
+
+typedef struct pending {
+    void * ptr; 
+    free_t free_; 
+} pending_t;
+
+typedef struct haz_local {
+    pending_t *pending; // to be freed
+    int pending_size;
+    int pending_count;
+
+    haz_t static_haz[STATIC_HAZ_PER_THREAD];
+
+    haz_t **dynamic;
+    int dynamic_size;
+    int dynamic_count;
+
+} haz_local_t;
+
+static haz_local_t haz_local_[MAX_NUM_THREADS] = {};
+
+static void sort_hazards (haz_t *hazards, int n) {
+    return;
+}
+
+static int search_hazards (void *p, haz_t *hazards, int n) {
+    for (int i = 0; i < n; ++i) {
+        if (hazards[i] == p) 
+            return TRUE;
+    }
+    return FALSE;
+}
+
+static void resize_pending (void) {
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
+    pending_t *p = nbd_malloc(sizeof(pending_t) * l->pending_size * 2);
+    memcpy(p, l->pending, l->pending_size);
+    nbd_free(l->pending);
+    l->pending = p;
+    l->pending_size *= 2;
+}
+
+void haz_defer_free (void *d, free_t f) {
+    assert(d);
+    assert(f);
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
+    while (l->pending_count == l->pending_size) {
+
+        if (l->pending_size == 0) {
+            l->pending_size = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD;
+            l->pending = nbd_malloc(sizeof(pending_t) * l->pending_size);
+            break;
+        }
+
+        // scan for hazard pointers
+        haz_t *hazards = nbd_malloc(sizeof(haz_t) * l->pending_size);
+        int    hazard_count = 0;
+        for (int i = 0; i < MAX_NUM_THREADS; ++i) {
+            haz_local_t *h = haz_local_ + i;
+            for (int j = 0; j < STATIC_HAZ_PER_THREAD; ++j) {
+                if (h->static_haz[j] != NULL) {
+                    if (hazard_count == l->pending_size) {
+                        resize_pending();
+                        nbd_free(hazards);
+                        haz_defer_free(d, f);
+                        return;
+                    }
+                    hazards[hazard_count++] = h->static_haz[j];
+                }
+            }
+            for (int j = 0; j < h->dynamic_count; ++j) {
+                if (h->dynamic[j] != NULL && *h->dynamic[j] != NULL) {
+                    if (hazard_count == l->pending_size) {
+                        resize_pending();
+                        nbd_free(hazards);
+                        haz_defer_free(d, f);
+                        return;
+                    }
+                    hazards[hazard_count++] = *h->dynamic[j];
+                }
+            }
+        }
+        sort_hazards(hazards, hazard_count);
+
+        // check for conflicts
+        int  conflicts_count = 0;
+        for (int i = 0; i < l->pending_count; ++i) {
+            pending_t *p = l->pending + i;
+            if (search_hazards(p->ptr, hazards, hazard_count)) {
+                l->pending[conflicts_count++] = *p; // put conflicts back on the pending list
+            } else {
+                assert(p->free_);
+                assert(p->ptr);
+                p->free_(p->ptr); // free pending item
+            }
+        }
+        l->pending_count = conflicts_count;
+        nbd_free(hazards);
+    }
+    l->pending[ l->pending_count ].ptr  = d;
+    l->pending[ l->pending_count ].free_ = f;
+    l->pending_count++;
+}
+
+haz_t *haz_get_static (int i) {
+    if (i >= STATIC_HAZ_PER_THREAD)
+        return NULL;
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    return &haz_local_[tid_].static_haz[i];
+}
+
+void haz_register_dynamic (haz_t *haz) {
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
+
+    if (l->dynamic_size == 0) {
+        l->dynamic_size = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD;
+        l->dynamic = nbd_malloc(sizeof(haz_t *) * l->dynamic_size);
+    }
+
+    if (l->dynamic_count == l->dynamic_size) {
+        haz_t **d = nbd_malloc(sizeof(haz_t *) * l->dynamic_size * 2);
+        memcpy(d, l->dynamic, l->dynamic_size);
+        nbd_free(l->dynamic);
+        l->dynamic = d;
+        l->dynamic_size *= 2;
+    }
+
+    l->dynamic[ l->dynamic_count++ ] = haz;
+}
+
+// assumes <haz> was registered in the same thread
+void haz_unregister_dynamic (void **haz) {
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
+
+    for (int i = 0; i < l->dynamic_count; ++i) {
+        if (l->dynamic[i] == haz) {
+            if (i != l->dynamic_count - 1) {
+                l->dynamic[i] = l->dynamic[ l->dynamic_count ];
+            }
+            l->dynamic_count--;
+            return;
+        }
+    }
+    assert(0);
+}
index 0b7cd5b7be8a0c23662a74cb3ea53eeb1a3e343a..1787a6253f299e67c01c063758aa26399d64bc87 100644 (file)
@@ -11,7 +11,7 @@
 #include "rlocal.h"
 #include "lwt.h"
 
-#define GET_SCALE(n) (sizeof(n)*8-__builtin_clzl((n)-1)) // log2 of <n>, rounded up
+#define GET_SCALE(n) (sizeof(void *)*__CHAR_BIT__ - __builtin_clzl((n) - 1)) // log2 of <n>, rounded up
 #define MAX_SCALE 31 // allocate blocks up to 4GB in size (arbitrary, could be bigger)
 #define REGION_SCALE 22 // 4MB regions
 #define REGION_SIZE (1 << REGION_SCALE)
@@ -67,6 +67,9 @@ void nbd_free (void *x) {
     block_t  *b = (block_t *)x;
     assert(((size_t)b >> REGION_SCALE) < ((1 << HEADER_REGION_SCALE) / sizeof(header_t)));
     header_t *h = region_header_ + ((size_t)b >> REGION_SCALE);
+#ifndef NDEBUG
+    memset(b, 0xcd, (1 << h->scale));
+#endif
     TRACE("m0", "nbd_free(): block %p scale %llu", b, h->scale);
     if (h->owner == tid_) {
         TRACE("m0", "nbd_free(): private block, free list head %p", 
@@ -76,8 +79,9 @@ void nbd_free (void *x) {
     } else {
         TRACE("m0", "nbd_free(): owner %llu free list head %p", 
                     h->owner, pub_free_list_[h->owner][h->scale][tid_]);
-        b->next = pub_free_list_[h->owner][h->scale][tid_];
-        pub_free_list_[h->owner][h->scale][tid_] = b;
+        do {
+            b->next = pub_free_list_[h->owner][h->scale][tid_];
+        } while (SYNC_CAS(&pub_free_list_[h->owner][h->scale][tid_], b->next, b) != b->next);
     }
 }
 
index 5ae851be58f5ef7021a10a78aeda6142e01e181e..50205e8670173c2d2ab55089ae1cac4b4bb80901 100644 (file)
@@ -12,6 +12,7 @@
 #include "lwt.h"
 #include "mem.h"
 #include "tls.h"
+#include "rcu.h"
 
 #define RCU_POST_THRESHOLD 10
 #define RCU_QUEUE_SCALE 20
@@ -71,17 +72,18 @@ void rcu_update (void) {
     }
 }
 
-void nbd_defer_free (void *x) {
+void rcu_defer_free (void *x) {
+    assert(x);
     LOCALIZE_THREAD_LOCAL(tid_, int);
     fifo_t *q = pending_[tid_];
     assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale));
     uint32_t i = MOD_SCALE(q->head++, q->scale);
     q->x[i] = x;
-    TRACE("r0", "nbd_defer_free: put %p on queue at position %llu", x, pending_[tid_]->head);
+    TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, pending_[tid_]->head);
 
     if (pending_[tid_]->head - rcu_last_posted_[tid_][tid_] < RCU_POST_THRESHOLD)
         return;
-    TRACE("r0", "nbd_defer_free: posting %llu", pending_[tid_]->head, 0);
+    TRACE("r0", "rcu_defer_free: posting %llu", pending_[tid_]->head, 0);
     int next_thread_id = (tid_ + 1) % num_threads_;
     rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
 }
index d4946cfc0b47dd8fb8e673915786a41e44eb4958..14a09f9bb11f19b8a828ca1473c3681d0f9f3910 100644 (file)
@@ -18,7 +18,7 @@ typedef struct thread_info {
     void *restrict arg;
 } thread_info_t;
 
-void nbd_init (void) {
+__attribute__ ((constructor)) void nbd_init (void) {
     sranddev();
     INIT_THREAD_LOCAL(rand_seed_);
     INIT_THREAD_LOCAL(tid_);
diff --git a/test/haz_test.c b/test/haz_test.c
new file mode 100644 (file)
index 0000000..9e6f1f2
--- /dev/null
@@ -0,0 +1,117 @@
+/* 
+ * Written by Josh Dybnis and released to the public domain, as explained at
+ * http://creativecommons.org/licenses/publicdomain
+ *
+ * hazard pointer test
+ *
+ */
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <pthread.h>
+#include <sys/time.h>
+#include "common.h"
+#include "mem.h"
+#include "runtime.h"
+#include "hazard.h"
+
+#define NUM_ITERATIONS 10000000
+#define MAX_NUM_THREADS 4
+
+typedef struct node {
+    struct node *next;
+} node_t;
+
+typedef struct lifo {
+    node_t *head;
+} lifo_t;
+
+static volatile int wait_;
+static lifo_t *stk_;
+
+void *worker (void *arg) {
+    int id = (int)(size_t)arg;
+    unsigned int r = (unsigned int)(id + 1) * 0x5bd1e995; // seed "random" number generator
+    haz_t *hp0 = haz_get_static(0);
+
+    // Wait for all the worker threads to be ready.
+    __sync_fetch_and_add(&wait_, -1);
+    do {} while (wait_); 
+
+    int i;
+    for (i = 0; i < NUM_ITERATIONS; ++ i) {
+        r ^= r << 6; r ^= r >> 21; r ^= r << 7; // generate next "random" number
+        if (r & 0x1000) {
+            // push
+            node_t *new_head = (node_t *)nbd_malloc(sizeof(node_t));
+            node_t *old_head = stk_->head;
+            node_t *temp;
+            do {
+                temp = old_head;
+                new_head->next = temp;
+            } while ((old_head = __sync_val_compare_and_swap(&stk_->head, temp, new_head)) != temp);
+        } else {
+            // pop
+            node_t *temp;
+            node_t *head = stk_->head;
+            do {
+                temp = head;
+                if (temp == NULL)
+                    break;
+                haz_set(hp0, temp);
+                head = ((volatile lifo_t *)stk_)->head;
+                if (temp != head)
+                    continue;
+            } while ((head = __sync_val_compare_and_swap(&stk_->head, temp, temp->next)) != temp);
+
+            if (temp != NULL) {
+                haz_defer_free(temp, nbd_free);
+            }
+        }
+    }
+
+    return NULL;
+}
+
+int main (int argc, char **argv) {
+    //lwt_set_trace_level("m0r0");
+
+    int num_threads = MAX_NUM_THREADS;
+    if (argc == 2)
+    {
+        errno = 0;
+        num_threads = strtol(argv[1], NULL, 10);
+        if (errno) {
+            fprintf(stderr, "%s: Invalid argument for number of threads\n", argv[0]);
+            return -1;
+        }
+        if (num_threads <= 0) {
+            fprintf(stderr, "%s: Number of threads must be at least 1\n", argv[0]);
+            return -1;
+        }
+    }
+
+    stk_ = (lifo_t *)nbd_malloc(sizeof(lifo_t)); 
+    memset(stk_, 0, sizeof(lifo_t));
+
+    struct timeval tv1, tv2;
+    gettimeofday(&tv1, NULL);
+    wait_ = num_threads;
+
+    pthread_t thread[num_threads];
+    for (int i = 0; i < num_threads; ++i) {
+        int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i);
+        if (rc != 0) { perror("pthread_create"); return rc; }
+    }
+    for (int i = 0; i < num_threads; ++i) {
+        pthread_join(thread[i], NULL);
+    }
+
+    gettimeofday(&tv2, NULL);
+    int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
+    printf("Th:%d Time:%dms\n\n", num_threads, ms);
+    fflush(stdout);
+
+    return 0;
+}
index a6a7192b1626fbf06e540e8ff033e6c2aa4c89f8..2e9ef859928609a807f6e0b1b039b78d038a518d 100644 (file)
@@ -7,6 +7,7 @@
 #include "nstring.h"
 #include "runtime.h"
 #include "map.h"
+#include "rcu.h"
 #include "list.h"
 #include "skiplist.h"
 #include "hashtable.h"
@@ -55,7 +56,6 @@ void *worker (void *arg) {
 }
 
 int main (int argc, char **argv) {
-    nbd_init();
     lwt_set_trace_level("r0m0l3");
 
     char* program_name = argv[0];
@@ -66,7 +66,7 @@ int main (int argc, char **argv) {
         return -1;
     }
 
-    num_threads_ = 1;
+    num_threads_ = MAX_NUM_THREADS;
     if (argc == 2)
     {
         errno = 0;
index f9444ca0077999c18a1f58f2dea19283219112c9..9fc6fe3297d14f3d50d19b71236f5ec91ed5193a 100644 (file)
@@ -20,6 +20,7 @@
 #include "hashtable.h"
 #include "lwt.h"
 #include "mem.h"
+#include "rcu.h"
 
 #define ASSERT_EQUAL(x, y) CuAssertIntEquals(tc, x, y)
 
@@ -316,8 +317,7 @@ void big_iteration_test (CuTest* tc) {
 
 int main (void) {
 
-    nbd_init();
-    lwt_set_trace_level("h3");
+    lwt_set_trace_level("l3");
 
     static const map_impl_t *map_types[] = { &ll_map_impl, &sl_map_impl, &ht_map_impl };
     for (int i = 0; i < sizeof(map_types)/sizeof(*map_types); ++i) {
@@ -327,10 +327,10 @@ int main (void) {
         CuString *output = CuStringNew();
         CuSuite* suite = CuSuiteNew();
 
+        SUITE_ADD_TEST(suite, concurrent_add_remove_test);
         SUITE_ADD_TEST(suite, basic_test);
         SUITE_ADD_TEST(suite, basic_iteration_test);
         SUITE_ADD_TEST(suite, big_iteration_test);
-        SUITE_ADD_TEST(suite, concurrent_add_remove_test);
 
         CuSuiteRun(suite);
         CuSuiteDetails(suite, output);
index 5a7606ad7a81fbea60941069bf1e902bad1a1c1d..8ad809635b0eae8ce09705585a71cc30d68bf81b 100644 (file)
@@ -26,7 +26,6 @@ void test1 (CuTest* tc) {
 
 int main (void) {
 
-    nbd_init();
     txn_init();
 
     CuString *output = CuStringNew();
diff --git a/todo b/todo
index d2755436647f10e7528b00db446e79d27a8b581b..dbdab1289ed2788b38ad7122a51b8ea0905f922a 100644 (file)
--- a/todo
+++ b/todo
 
 memory manangement
 ------------------
-- make rcu yield when its buffer gets full instead of throwing an assert
+- allow threads to dynamically enter and exit rcu's token ring
+- augment rcu with heartbeat manager to kill stalled threads
+- make rcu try yielding when its buffer gets full
 - alternate memory reclamation schemes: hazard pointers and/or reference counting
 - seperate nbd_malloc/nbd_free into general purpose malloc/free replacement
 
 quality
 -------
-- verify the key management in list, skiplist, and hashtable
+- verify the key memory management in list, skiplist, and hashtable
 - transaction tests
 - port perf tests from lib-high-scale
 - characterize the performance of hashtable vs. skiplist vs. list
index 5648d16abbba2b2d11b3f0b75bbbb267251746cb..1cc8e2f84fdeff975d719bfada0608bed62c42bb 100644 (file)
--- a/txn/txn.c
+++ b/txn/txn.c
@@ -5,6 +5,7 @@
 #include "common.h"
 #include "txn.h"
 #include "mem.h"
+#include "rcu.h"
 #include "skiplist.h"
 
 #define UNDETERMINED_VERSION 0
@@ -196,8 +197,8 @@ void txn_abort (txn_t *txn) {
         update->version = ABORTED_VERSION;
     }
 
-    nbd_defer_free(txn->writes);
-    nbd_defer_free(txn);
+    rcu_defer_free(txn->writes);
+    rcu_defer_free(txn);
 }
 
 txn_state_e txn_commit (txn_t *txn) {
@@ -228,8 +229,8 @@ txn_state_e txn_commit (txn_t *txn) {
         }
     } while (old_count != temp);
 
-    nbd_defer_free(txn->writes);
-    nbd_defer_free(txn);
+    rcu_defer_free(txn->writes);
+    rcu_defer_free(txn);
 
     return state;
 }
@@ -341,7 +342,7 @@ map_val_t txn_map_get (txn_t *txn, map_key_t key) {
         }
         if (update->version <= min_active_version) {
             if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) {
-                nbd_defer_free(update);
+                rcu_defer_free(update);
             }
         }
     }