work in progress master
authorjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Sun, 10 May 2009 02:11:03 +0000 (02:11 +0000)
committerjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Sun, 10 May 2009 02:11:03 +0000 (02:11 +0000)
Warning: does not compile

16 files changed:
include/lwt.h
include/runtime.h
include/tls.h
makefile
runtime/hazard.c
runtime/lwt.c
runtime/mem.c
runtime/random.c [new file with mode: 0644]
runtime/rcu.c
runtime/rlocal.h
runtime/runtime.c
test/map_test1.c
test/map_test2.c
test/perf_test.c
test/rcu_test.c
test/txn_test.c

index 5c0b6a4da92e6c7c4dec6e86b7739f8b7c5398e9..085f6dd4031c88f67f9eb0fd8f8d65cb8d86b1bd 100644 (file)
@@ -34,8 +34,8 @@ void lwt_set_trace_level (const char *flags);
 // the dump. It is only included when its specified category is enabled at a trace level greater than or equal to
 // the one in <flag>. Categories are case sensitive. 
 static inline void lwt_trace (const char *flag, const char *format, size_t value1, size_t value2) {
-    extern char flag_state_[256];
-    if (EXPECT_FALSE(flag_state_[(unsigned)flag[0]] >= flag[1])) {
+    extern char TraceLevel[256];
+    if (EXPECT_FALSE(TraceLevel[(unsigned)flag[0]] >= flag[1])) {
         // embed <flags> in <format> so we don't have to make the lwt_record_t any bigger than it already is
         uint64_t f = ((uint64_t)(size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48));
         extern void lwt_trace_i (uint64_t format, size_t value1, size_t value2);
index e453fecac9b9425123b05170c7e0c10f5b22df29..4fe4cbf67558a989081f48b996e50b5b61e80912 100644 (file)
@@ -8,11 +8,7 @@
 #include <pthread.h>
 #include "tls.h"
 
-extern DECLARE_THREAD_LOCAL(tid_, int);
-
-int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg);
+void nbd_thread_init (void);
 uint64_t nbd_rand (void);
-uint64_t nbd_rand_seed (int i);
-int nbd_next_rand (uint64_t *r);
 
 #endif//RUNTIME_H
index 5f3d0e17ca784c274a9d41a0cc6e94c0126e1ef3..c496dab9faac6e14a205e771db558ecbd55bbd34 100644 (file)
 
 #define INIT_THREAD_LOCAL(name) \
     do { \
-        if (pthread_key_create(&name##_KEY, NULL) != 0) { assert(FALSE); } \
+        if (pthread_key_create(&name##_KEY, NULL) != 0) { \
+            assert("error initializing thread local variable " #name, FALSE); \
+        } \
     } while (0)
 
-#define SET_THREAD_LOCAL(name, value) pthread_setspecific(name##_KEY, (void *)(size_t)value);
+#define SET_THREAD_LOCAL(name, value) \
+    do { \
+        name = value; \
+        pthread_setspecific(name##_KEY, (void *)(size_t)value); \
+    } while (0);
 
 #define LOCALIZE_THREAD_LOCAL(name, type) type name = (type)(size_t)pthread_getspecific(name##_KEY)
 
index 0130026c8e0a7a78abb9dfd86ecb75c9a04f1c0b..d9863f2a0111bec1fc8069fcfff905bb347ac8eb 100644 (file)
--- a/makefile
+++ b/makefile
@@ -5,15 +5,16 @@
 # Makefile for building programs with whole-program interfile optimization
 ###################################################################################################
 CFLAGS0 := -Wall -Werror -std=gnu99 -lpthread #-m32 -DNBD32
-CFLAGS1 := $(CFLAGS0) -g -O3 #-DNDEBUG #-fwhole-program -combine
+CFLAGS1 := $(CFLAGS0) -g #-DNDEBUG #-fwhole-program -combine
 CFLAGS2 := $(CFLAGS1) #-DENABLE_TRACE
 CFLAGS3 := $(CFLAGS2) #-DLIST_USE_HAZARD_POINTER
 CFLAGS  := $(CFLAGS3) #-DNBD_SINGLE_THREADED #-DUSE_SYSTEM_MALLOC #-DTEST_STRING_KEYS
 INCS    := $(addprefix -I, include)
-TESTS   := output/perf_test #output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
+TESTS   := output/perf_test output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
 OBJS    := $(TESTS)
 
-RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c #runtime/hazard.c
+RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c runtime/random.c \
+                               datatype/nstring.c #runtime/hazard.c
 MAP_SRCS     := map/map.c map/list.c map/skiplist.c map/hashtable.c
 
 haz_test_SRCS  := $(RUNTIME_SRCS) test/haz_test.c
index 3ebed2ee0ca7f267a0926e5f66e662f6216a7d8c..431c57665029b72db1d21c8fd34b90458cce3184 100644 (file)
@@ -53,8 +53,8 @@ static int search_hazards (void *p, haz_t *hazards, int n) {
 
 static void resize_pending (void) {
     TRACE("H2", "haz_resize_pending", 0, 0);
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    haz_local_t *l = haz_local_ + tid_;
+    LOCALIZE_THREAD_LOCAL(ThreadId, int);
+    haz_local_t *l = haz_local_ + ThreadId;
     pending_t *p = nbd_malloc(sizeof(pending_t) * l->pending_size * 2);
     memcpy(p, l->pending, l->pending_size);
     nbd_free(l->pending);
@@ -66,8 +66,8 @@ void haz_defer_free (void *d, free_t f) {
     TRACE("H1", "haz_defer_free: %p (%p)", d, f);
     assert(d);
     assert(f);
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    haz_local_t *l = haz_local_ + tid_;
+    LOCALIZE_THREAD_LOCAL(ThreadId, int);
+    haz_local_t *l = haz_local_ + ThreadId;
     while (l->pending_count == l->pending_size) {
 
         if (l->pending_size == 0) {
@@ -131,17 +131,17 @@ haz_t *haz_get_static (int i) {
     TRACE("H1", "haz_get_static: %p", i, 0);
     if (i >= STATIC_HAZ_PER_THREAD)
         return NULL;
-    LOCALIZE_THREAD_LOCAL(tid_, int);
+    LOCALIZE_THREAD_LOCAL(ThreadId, int);
     assert(i < STATIC_HAZ_PER_THREAD);
-    haz_t *ret = &haz_local_[tid_].static_haz[i];
+    haz_t *ret = &haz_local_[ThreadId].static_haz[i];
     TRACE("H1", "haz_get_static: returning %p", ret, 0);
     return ret;
 }
 
 void haz_register_dynamic (haz_t *haz) {
     TRACE("H1", "haz_register_dynamic: %p", haz, 0);
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    haz_local_t *l = haz_local_ + tid_;
+    LOCALIZE_THREAD_LOCAL(ThreadId, int);
+    haz_local_t *l = haz_local_ + ThreadId;
 
     if (l->dynamic_size == 0) {
         int n = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD;
@@ -163,8 +163,8 @@ void haz_register_dynamic (haz_t *haz) {
 // assumes <haz> was registered in the same thread
 void haz_unregister_dynamic (void **haz) {
     TRACE("H1", "haz_unregister_dynamic: %p", haz, 0);
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    haz_local_t *l = haz_local_ + tid_;
+    LOCALIZE_THREAD_LOCAL(ThreadId, int);
+    haz_local_t *l = haz_local_ + ThreadId;
 
     for (int i = 0; i < l->dynamic_count; ++i) {
         if (l->dynamic[i] == haz) {
index a986567bcb05023f8e6bafb541035fe5266f9900..47bfcebb53f31760924139e51ccede621e604a01 100644 (file)
@@ -28,35 +28,34 @@ typedef struct lwt_buffer {
     lwt_record_t x[0];
 } lwt_buffer_t;
 
-lwt_buffer_t *lwt_buf_[MAX_NUM_THREADS] = {};
-char flag_state_[256] = {};
-static const char *flags_ = "";
-
-void lwt_thread_init (int thread_id)
-{
-    assert(thread_id < MAX_NUM_THREADS);
-    if (lwt_buf_[thread_id] == NULL) {
-        lwt_buf_[thread_id] = (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
-        memset(lwt_buf_[thread_id], 0, sizeof(lwt_buffer_t));
+lwt_buffer_t *TraceBuffer[MAX_NUM_THREADS] = {};
+char TraceLevel[256] = {};
+static const char *TraceSpec = "";
+
+void lwt_thread_init (void) {
+    int thread_index = GET_THREAD_INDEX();
+
+    if (TraceBuffer[thread_index] == NULL) {
+        TraceBuffer[thread_index] = 
+            (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
+        memset(TraceBuffer[thread_index], 0, sizeof(lwt_buffer_t));
     }
 }
 
-void lwt_set_trace_level (const char *flags)
-{
+void lwt_set_trace_level (const char *flags) {
     assert(strlen(flags) % 2 == 0); // a well formed <flags> should be an even number of characters long
-    flags_ = flags;
-    memset(flag_state_, 0, sizeof(flag_state_));
+    TraceSpec = flags;
+    memset(TraceLevel, 0, sizeof(TraceLevel));
     for (int i = 0; flags[i]; i+=2) {
-        flag_state_[(unsigned)flags[i]] = flags[i+1];
+        TraceLevel[(unsigned)flags[i]] = flags[i+1];
     }
 }
 
-static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset)
-{
+static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) {
     // print the record if its trace category is enabled at a high enough level
     int flag  =  r->format >> 56;
     int level = (r->format >> 48) & 0xFF;
-    if (flag_state_[(unsigned)flag] >= level) {
+    if (TraceLevel[(unsigned)flag] >= level) {
         char s[3] = {flag, level, '\0'};
         fprintf(file, "%09llu %d %s ", ((uint64_t)r->timestamp - offset) >> 5, thread_id, s);
         const char *format = (const char *)(size_t)(r->format & MASK(48)); // strip out the embedded flags
@@ -65,18 +64,17 @@ static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t of
     }
 }
 
-static void dump_buffer (FILE *file, int thread_id, uint64_t offset)
-{
-    lwt_buffer_t *tb = lwt_buf_[thread_id]; 
+static void dump_buffer (FILE *file, int thread_index, uint64_t offset) {
+    lwt_buffer_t *tb = TraceBuffer[thread_index]; 
     assert(tb);
     if (tb->head > LWT_BUFFER_SIZE) {
         for (int i = tb->head & LWT_BUFFER_MASK; i < LWT_BUFFER_SIZE; ++i) {
-            dump_record(file, thread_id, tb->x + i, offset);
+            dump_record(file, thread_index + 1, tb->x + i, offset);
         }
     }
 
     for (int i = 0; i < (tb->head & LWT_BUFFER_MASK); ++i) {
-        dump_record(file, thread_id, tb->x + i, offset);
+        dump_record(file, thread_index + 1, tb->x + i, offset);
     }
 }
 
@@ -84,20 +82,19 @@ void lwt_halt (void) {
     halt_ = 1;
 }
 
-void lwt_dump (const char *file_name)
-{
+void lwt_dump (const char *file_name) {
     halt_ = 1;
     uint64_t offset = (uint64_t)-1;
 
     for (int i = 0; i < MAX_NUM_THREADS; ++i) {
-        if (lwt_buf_[i] != NULL && lwt_buf_[i]->head != 0) {
-            uint64_t x = lwt_buf_[i]->x[0].timestamp;
+        if (TraceBuffer[i] != NULL && TraceBuffer[i]->head != 0) {
+            uint64_t x = TraceBuffer[i]->x[0].timestamp;
             if (x < offset) {
                 offset = x;
             }
-            if (lwt_buf_[i]->head > LWT_BUFFER_SIZE)
+            if (TraceBuffer[i]->head > LWT_BUFFER_SIZE)
             {
-                x = lwt_buf_[i]->x[lwt_buf_[i]->head & LWT_BUFFER_MASK].timestamp;
+                x = TraceBuffer[i]->x[TraceBuffer[i]->head & LWT_BUFFER_MASK].timestamp;
                 if (x < offset) {
                     offset = x;
                 }
@@ -109,7 +106,7 @@ void lwt_dump (const char *file_name)
         FILE *file = fopen(file_name, "w");
         assert(file);
         for (int i = 0; i < MAX_NUM_THREADS; ++i) {
-            if (lwt_buf_[i] != NULL) {
+            if (TraceBuffer[i] != NULL) {
                 dump_buffer(file, i, offset);
             }
         }
@@ -120,8 +117,7 @@ void lwt_dump (const char *file_name)
 
 void lwt_trace_i (uint64_t format, size_t value1, size_t value2) {
     while (halt_) {}
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    lwt_buffer_t *tb = lwt_buf_[tid_];
+    lwt_buffer_t *tb = TraceBuffer[GET_THREAD_INDEX()];
     if (tb != NULL) {
         unsigned int u, l;
         __asm__ __volatile__("rdtsc" : "=a" (l), "=d" (u)); 
index 90a22e1f90b57e3d8b31d17618be2a3b95647258..2f55ff4d2108d93522d878e4549ef5812cc10721 100644 (file)
@@ -74,9 +74,9 @@ static inline header_t *get_header (void *r) {
 }
 
 static void *get_new_region (int block_scale) {
-    LOCALIZE_THREAD_LOCAL(tid_, int);
+    int thread_index = GET_THREAD_INDEX();
 #ifdef RECYCLE_PAGES
-    tl_t *tl = &tl_[tid_]; // thread-local data
+    tl_t *tl = &tl_[thread_index]; // thread-local data
     if (block_scale <= PAGE_SCALE && tl->free_pages != NULL) {
         void *region = tl->free_pages;
         tl->free_pages = tl->free_pages->next;
@@ -122,7 +122,7 @@ static void *get_new_region (int block_scale) {
     TRACE("m1", "get_new_region: header %p (%p)", h, h - headers_);
     assert(h->scale == 0);
     h->scale = block_scale;
-    h->owner = tid_;
+    h->owner = thread_index;
 
     return region;
 }
@@ -148,7 +148,6 @@ void mem_init (void) {
 void nbd_free (void *x) {
     TRACE("m1", "nbd_free: block %p page %p", x, (size_t)x & ~MASK(PAGE_SCALE));
     ASSERT(x);
-    LOCALIZE_THREAD_LOCAL(tid_, int);
     block_t  *b = (block_t *)x;
     header_t *h = get_header(x);
     int b_scale = h->scale;
@@ -164,8 +163,9 @@ void nbd_free (void *x) {
 #ifndef NDEBUG
     memset(b, 0xcd, (1ULL << b_scale)); // bear trap
 #endif
-    tl_t *tl = &tl_[tid_]; // thread-local data
-    if (h->owner == tid_) {
+    int thread_index = GET_THREAD_INDEX();
+    tl_t *tl = &tl_[thread_index]; // thread-local data
+    if (h->owner == thread_index) {
         TRACE("m1", "nbd_free: private block, old free list head %p", tl->free_list[b_scale], 0);
 
 #ifndef RECYCLE_PAGES
@@ -264,8 +264,7 @@ void *nbd_malloc (size_t n) {
     if (EXPECT_FALSE(b_scale < MIN_SCALE)) { b_scale = MIN_SCALE; }
     if (EXPECT_FALSE(b_scale > MAX_SCALE)) { return NULL; }
 
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    tl_t *tl = &tl_[tid_]; // thread-local data
+    tl_t *tl = &tl_[GET_THREAD_INDEX()]; // thread-local data
 
     block_t *b = pop_free_list(tl, b_scale);
     if (b != NULL) {
diff --git a/runtime/random.c b/runtime/random.c
new file mode 100644 (file)
index 0000000..d6d1cf0
--- /dev/null
@@ -0,0 +1,79 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#include "common.h"
+#include "runtime.h"
+
+DECLARE_THREAD_LOCAL(rx_, uint32_t);
+DECLARE_THREAD_LOCAL(ry_, uint32_t);
+DECLARE_THREAD_LOCAL(rz_, uint32_t);
+DECLARE_THREAD_LOCAL(rc_, uint32_t);
+
+void rnd_init (void) {
+    INIT_THREAD_LOCAL(rx_);
+    INIT_THREAD_LOCAL(ry_);
+    INIT_THREAD_LOCAL(rz_);
+    INIT_THREAD_LOCAL(rc_);
+}
+
+// TODO: put a lock around this so that multiple threads being initialize concurrently don't read
+//       the same values from /dev/urandom
+void rnd_thread_init (void) {
+    int fd = open("/dev/urandom", O_RDONLY);
+    if (fd == -1) {
+        perror("Error opening /dev/urandom");
+        exit(1);
+    }
+
+    char buf[16];
+
+    int n = read(fd, buf, sizeof(buf));
+    if (n != 16) {
+        if (n == -1) {
+            perror("Error reading from /dev/urandom");
+        }
+        fprintf(stderr, "Could not read enough bytes from /dev/urandom");
+        exit(1);
+    }
+
+    uint32_t x, y, z, c;
+    memcpy(&x, buf +  0, 4);
+    memcpy(&y, buf +  4, 4);
+    memcpy(&z, buf +  8, 4);
+    memcpy(&c, buf + 12, 4);
+
+    SET_THREAD_LOCAL(rx_, x);
+    SET_THREAD_LOCAL(ry_, y);
+    SET_THREAD_LOCAL(rz_, z);
+    SET_THREAD_LOCAL(rc_, z);
+}
+
+// George Marsaglia's KISS generator
+//
+// Even though this returns 64 bits, this algorithm was only designed to generate 32 bits.
+// The upper 32 bits is going to be highly correlated with the lower 32 bits of the next call.
+uint64_t nbd_rand (void) {
+    LOCALIZE_THREAD_LOCAL(rx_, unsigned);
+    LOCALIZE_THREAD_LOCAL(ry_, unsigned);
+    LOCALIZE_THREAD_LOCAL(rz_, unsigned);
+    LOCALIZE_THREAD_LOCAL(rc_, unsigned);
+
+    uint32_t rx = 69069 * rx_ + 12345;
+    uint32_t ry = ry_;
+    ry ^= (ry << 13);
+    ry ^= (ry >> 17);
+    ry ^= (ry <<  5);
+    uint64_t t = rz_ * 698769069LL + rc_;
+    uint64_t r = rx + ry + t;
+
+    SET_THREAD_LOCAL(rx_, rx);
+    SET_THREAD_LOCAL(ry_, ry);
+    SET_THREAD_LOCAL(rz_, t);
+    SET_THREAD_LOCAL(rc_, t >> 32);
+
+    return r;
+}
index 12c37a3d4e37df2d5a340010ed039cde7efc1b3c..0d65a37f6dda8bca88b9e40e634a663b7637f776 100644 (file)
@@ -39,36 +39,35 @@ static fifo_t *fifo_alloc(int scale) {
     return q;
 }
 
-void rcu_thread_init (int id) {
-    assert(id < MAX_NUM_THREADS);
-    if (pending_[id] == NULL) {
-        pending_[id] = fifo_alloc(RCU_QUEUE_SCALE);
+void rcu_thread_init (void) {
+    int thread_index = GET_THREAD_INDEX();
+    if (pending_[thread_index] == NULL) {
+        pending_[thread_index] = fifo_alloc(RCU_QUEUE_SCALE);
         (void)SYNC_ADD(&num_threads_, 1);
     }
 }
 
 void rcu_update (void) {
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    assert(tid_ < num_threads_);
-    int next_thread_id = (tid_ + 1) % num_threads_;
-    TRACE("r1", "rcu_update: updating thread %llu", next_thread_id, 0);
+    int thread_index = GET_THREAD_INDEX();
+    int next_thread_index = (thread_index + 1) % num_threads_;
+    TRACE("r1", "rcu_update: updating thread %llu", next_thread_index, 0);
     int i;
     for (i = 0; i < num_threads_; ++i) {
-        if (i == tid_)
+        if (i == thread_index)
             continue;
 
         // No need to post an update if the value hasn't changed
-        if (rcu_[tid_][i] == rcu_last_posted_[tid_][i])
+        if (rcu_[thread_index][i] == rcu_last_posted_[thread_index][i])
             continue;
 
-        uint64_t x = rcu_[tid_][i];
-        rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x;
+        uint64_t x = rcu_[thread_index][i];
+        rcu_[next_thread_index][i] = rcu_last_posted_[thread_index][i] = x;
         TRACE("r2", "rcu_update: posted updated value (%llu) for thread %llu", x, i);
     }
 
     // free
-    fifo_t *q = pending_[tid_];
-    while (q->tail != rcu_[tid_][tid_]) {
+    fifo_t *q = pending_[thread_index];
+    while (q->tail != rcu_[thread_index][thread_index]) {
         uint32_t i = MOD_SCALE(q->tail, q->scale);
         TRACE("r0", "rcu_update: freeing %p from queue at position %llu", q->x[i], q->tail);
         nbd_free(q->x[i]);
@@ -78,17 +77,18 @@ void rcu_update (void) {
 
 void rcu_defer_free (void *x) {
     assert(x);
-    LOCALIZE_THREAD_LOCAL(tid_, int);
-    fifo_t *q = pending_[tid_];
+    int thread_index = GET_THREAD_INDEX();
+    fifo_t *q = pending_[thread_index];
     assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale));
     uint32_t i = MOD_SCALE(q->head, q->scale);
     q->x[i] = x;
     TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, q->head);
     q->head++;
 
-    if (pending_[tid_]->head - rcu_last_posted_[tid_][tid_] >= RCU_POST_THRESHOLD) {
-        TRACE("r0", "rcu_defer_free: posting %llu", pending_[tid_]->head, 0);
-        int next_thread_id = (tid_ + 1) % num_threads_;
-        rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
+    if (pending_[thread_index]->head - rcu_last_posted_[thread_index][thread_index] >= RCU_POST_THRESHOLD) {
+        TRACE("r0", "rcu_defer_free: posting %llu", pending_[thread_index]->head, 0);
+        int next_thread_index = (thread_index + 1) % num_threads_;
+        rcu_[next_thread_index][thread_index] = pending_[thread_index]->head;
+        rcu_last_posted_[thread_index][thread_index] = pending_[thread_index]->head;
     }
 }
index bb62c76e260b98cf3b43b7a9834cfe69b12a653a..fef861f423a408633e1daf804d739d45525d9110 100644 (file)
@@ -4,9 +4,15 @@
 #include "runtime.h"
 #include "tls.h"
 
+extern DECLARE_THREAD_LOCAL(ThreadId, int);
+
+#define GET_THREAD_INDEX() ({ LOCALIZE_THREAD_LOCAL(ThreadId, int); assert(ThreadId != 0); ThreadId - 1; })
+
 void mem_init (void);
+void rnd_init (void);
 
-void rcu_thread_init (int thread_id);
-void lwt_thread_init (int thread_id);
+void rnd_thread_init (void);
+void rcu_thread_init (void);
+void lwt_thread_init (void);
 
 #endif//RLOCAL_H 
index fe4a1bd894a6dc09d776ffbbb8276d1f7ce2605f..19024e2f389f237ecbd6f0e2edf906b0b198a1d2 100644 (file)
@@ -2,7 +2,6 @@
  * Written by Josh Dybnis and released to the public domain, as explained at
  * http://creativecommons.org/licenses/publicdomain
  */
-#define _POSIX_C_SOURCE 1 // for rand_r()
 #include <stdlib.h>
 #include <pthread.h>
 #include "common.h"
 #include "mem.h"
 #include "tls.h"
 
-DECLARE_THREAD_LOCAL(tid_, int);
-DECLARE_THREAD_LOCAL(rx_, uint32_t);
-DECLARE_THREAD_LOCAL(ry_, uint32_t);
-DECLARE_THREAD_LOCAL(rz_, uint32_t);
-DECLARE_THREAD_LOCAL(rc_, uint32_t);
+DECLARE_THREAD_LOCAL(ThreadId, int);
+static int ThreadIndex
 
-typedef struct thread_info {
-    int thread_id;
-    void *(*start_routine)(void *);
-    void *restrict arg;
-} thread_info_t;
+static int MaxThreadId = 0;
 
 __attribute__ ((constructor)) void nbd_init (void) {
-    INIT_THREAD_LOCAL(r);
-    INIT_THREAD_LOCAL(tid_);
-    SET_THREAD_LOCAL(tid_, 0);
+    rnd_init();
     mem_init();
-    lwt_thread_init(0);
-    rcu_thread_init(0);
-    srand((uint32_t)rdtsc());
 }
 
-static void *worker (void *arg) {
-    thread_info_t *ti = (thread_info_t *)arg;
-    SET_THREAD_LOCAL(tid_, ti->thread_id);
-    LOCALIZE_THREAD_LOCAL(tid_, int);
+void nbd_thread_init (void) {
+    LOCALIZE_THREAD_LOCAL(ThreadId, int);
 
-    SET_THREAD_LOCAL(rx_, rand());
-    SET_THREAD_LOCAL(ry_, rand());
-    SET_THREAD_LOCAL(rz_, rand());
-    SET_THREAD_LOCAL(rc_, rand());
+    if (ThreadId == 0) {
+        ++MaxThreadId; // TODO: reuse thread id's of threads that have been destroyed
+        ASSERT(MaxThreadId <= MAX_NUM_THREADS);
+        SET_THREAD_LOCAL(ThreadId, MaxThreadId);
+        rnd_thread_init();
+    } 
 
-    lwt_thread_init(ti->thread_id);
-    rcu_thread_init(ti->thread_id);
-
-    void *ret = ti->start_routine(ti->arg);
-    nbd_free(ti);
-    return ret;
-}
-
-int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg) {
-    thread_info_t *ti = (thread_info_t *)nbd_malloc(sizeof(thread_info_t));
-    ti->thread_id = thread_id;
-    ti->start_routine = start_routine;
-    ti->arg = arg;
-    return pthread_create(thread, NULL, worker, ti);
-}
-
-// George Marsaglia's KISS generator
-uint64_t nbd_rand (void) {
-    LOCALIZE_THREAD_LOCAL(rx_, unsigned);
-    LOCALIZE_THREAD_LOCAL(ry_, unsigned);
-    LOCALIZE_THREAD_LOCAL(rz_, unsigned);
-    LOCALIZE_THREAD_LOCAL(rc_, unsigned);
-
-    uint32_t rx = 69069 * rx_ + 12345;
-    uint32_t ry = ry_;
-    uint32_t rz = rz_;
-    ry ^= (ry << 13);
-    ry ^= (ry >> 17);
-    ry ^= (ry <<  5);
-    uint64_t t = rz * 698769069LL + rc_;
-    uint64_t r = rx + ry + (rz = t);
-
-    SET_THREAD_LOCAL(rx_, rx);
-    SET_THREAD_LOCAL(ry_, ry);
-    SET_THREAD_LOCAL(rz_, rz);
-    SET_THREAD_LOCAL(rc_, t >> 32);
-
-    return r;
-}
-
-// Fairly fast random numbers
-uint64_t nbd_rand_seed (int i) {
-    return rdtsc() + -715159705 + i * 129;
-}
-
-int nbd_next_rand (uint64_t *r) {
-    *r = (*r * 0x5DEECE66DLL + 0xBLL) & MASK(48);
-    return (*r >> 17) & 0x7FFFFFFF;
+    lwt_thread_init();
+    rcu_thread_init();
 }
index f05fa3dea001871492aeb8e75ba166c221c11037..11fa5d45aba7a823911e3c15070585b6ef308fe6 100644 (file)
@@ -21,6 +21,7 @@ static long num_threads_;
 static map_t *map_;
 
 void *worker (void *arg) {
+    nbd_thread_init();
 
     // Wait for all the worker threads to be ready.
     (void)SYNC_ADD(&wait_, -1);
@@ -56,6 +57,7 @@ void *worker (void *arg) {
 }
 
 int main (int argc, char **argv) {
+    nbd_thread_init();
     lwt_set_trace_level("r0m3s3");
 
     char* program_name = argv[0];
@@ -99,7 +101,7 @@ int main (int argc, char **argv) {
         wait_ = num_threads_;
 
         for (int i = 0; i < num_threads_; ++i) {
-            int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
+            int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
             if (rc != 0) { perror("pthread_create"); return rc; }
         }
 
@@ -109,7 +111,7 @@ int main (int argc, char **argv) {
 
         gettimeofday(&tv2, NULL);
         int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
-        map_print(map_);
+        map_print(map_, FALSE);
         printf("Th:%ld Time:%dms\n\n", num_threads_, ms);
         fflush(stdout);
     }
index 52438ea763364f5384b86aaf189cf0d87d6ff005..f8e0abd85b7f3958df026007b017c78db3d40729 100644 (file)
@@ -132,6 +132,8 @@ void basic_test (CuTest* tc) {
 }
 
 void *add_remove_worker (void *arg) {
+    nbd_thread_init();
+
     worker_data_t *wd = (worker_data_t *)arg;
     map_t *map = wd->map;
     CuTest* tc = wd->tc;
@@ -197,7 +199,7 @@ void concurrent_add_remove_test (CuTest* tc) {
         wd[i].tc = tc;
         wd[i].map = map;
         wd[i].wait = &wait;
-        int rc = nbd_thread_create(thread + i, i, add_remove_worker, wd + i);
+        int rc = pthread_create(thread + i, NULL, add_remove_worker, wd + i);
         if (rc != 0) { perror("nbd_thread_create"); return; }
     }
 
@@ -207,7 +209,7 @@ void concurrent_add_remove_test (CuTest* tc) {
 
     gettimeofday(&tv2, NULL);
     int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
-    map_print(map);
+    map_print(map, FALSE);
     printf("Time:%dms\n", ms);
     fflush(stdout);
 
@@ -315,6 +317,7 @@ void big_iteration_test (CuTest* tc) {
 }
 
 int main (void) {
+    nbd_thread_init();
     lwt_set_trace_level("r0m3l2t0");
 
     static const map_impl_t *map_types[] = { &MAP_IMPL_LL, &MAP_IMPL_SL, &MAP_IMPL_HT };
index 1ea0bcb3eb11f831c9ca50991213121b0e659cf8..088956efefbbd115bca68532671d7398ed09bca0 100644 (file)
@@ -27,7 +27,7 @@ static int duration_;
 #define OP_SELECT_RANGE (1ULL << 20)
 
 void *worker (void *arg) {
-    volatile uint64_t ops = 0;
+    nbd_thread_init();
 
     // Wait for all the worker threads to be ready.
     (void)SYNC_ADD(&load_, -1);
@@ -44,6 +44,7 @@ void *worker (void *arg) {
     (void)SYNC_ADD(&start_, -1);
     do {} while (start_);
 
+    uint64_t ops = 0;
     while (!stop_) {
         ++ops;
         map_key_t key = (nbd_rand() & (num_keys_ - 1)) + 1;
@@ -77,7 +78,7 @@ uint64_t run_test (void) {
 
     pthread_t thread[MAX_NUM_THREADS];
     for (int i = 0; i < num_threads_; ++i) {
-        int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
+        int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
         if (rc != 0) { perror("pthread_create"); exit(rc); }
     }
 
@@ -152,6 +153,7 @@ int main (int argc, char **argv) {
     get_range_ = (int)((double)OP_SELECT_RANGE / 100 * read_ratio);
     put_range_ = get_range_ + (int)(((double)OP_SELECT_RANGE - get_range_) / 100 * put_ratio);
 
+    nbd_thread_init();
     static const map_impl_t *map_types[] = { &MAP_IMPL_HT };
     for (int i = 0; i < sizeof(map_types)/sizeof(*map_types); ++i) {
 #ifdef TEST_STRING_KEYS
index b2dedaae418d974406b0697cbdef7457efb107c5..b2258554ad037684494509a3768df61581f73fb3 100644 (file)
@@ -1,7 +1,7 @@
-#define _POSIX_C_SOURCE 1 // for rand_r
 #include <stdio.h>
 #include <errno.h>
 #include <pthread.h>
+#include <unistd.h>
 #include <sys/time.h>
 #include "common.h"
 #include "runtime.h"
@@ -53,8 +53,7 @@ node_t *node_alloc (void) {
 }
 
 void *worker (void *arg) {
-    int id = (int)(size_t)arg;
-    unsigned int rand_seed = (unsigned int)id + 1;
+    nbd_thread_init();
 
     // Wait for all the worker threads to be ready.
     (void)__sync_fetch_and_add(&wait_, -1);
@@ -62,7 +61,7 @@ void *worker (void *arg) {
 
     int i;
     for (i = 0; i < NUM_ITERATIONS; ++ i) {
-        int n = rand_r(&rand_seed);
+        int n = nbd_rand();
         if (n & 0x1) {
             lifo_aba_push(stk_, node_alloc());
         } else {
@@ -78,9 +77,10 @@ void *worker (void *arg) {
 }
 
 int main (int argc, char **argv) {
+    nbd_thread_init();
     lwt_set_trace_level("m3r3");
 
-    int num_threads = MAX_NUM_THREADS;
+    int num_threads = sysconf(_SC_NPROCESSORS_CONF);
     if (argc == 2)
     {
         errno = 0;
@@ -104,7 +104,7 @@ int main (int argc, char **argv) {
 
     pthread_t thread[num_threads];
     for (int i = 0; i < num_threads; ++i) {
-        int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i);
+        int rc = pthread_create(thread + i, NULL, worker, (void *)(size_t)i);
         if (rc != 0) { perror("pthread_create"); return rc; }
     }
     for (int i = 0; i < num_threads; ++i) {
index 0a5045d753f8e45654963437423e92f479b109e3..4728ef971ec3553584efad4fd0e62d550ed64c19 100644 (file)
@@ -25,7 +25,7 @@ void test1 (CuTest* tc) {
 }
 
 int main (void) {
-
+    nbd_thread_init();
     lwt_set_trace_level("x3h3");
 
     CuString *output = CuStringNew();