]> pd.if.org Git - nbds/commitdiff
revert to working upstream version, fix osx build osxbuild
authorargv0 <andy@andygross.org>
Tue, 16 Aug 2011 22:15:54 +0000 (15:15 -0700)
committerargv0 <andy@andygross.org>
Tue, 16 Aug 2011 22:15:54 +0000 (15:15 -0700)
17 files changed:
include/lwt.h
include/mem.h
include/runtime.h
include/tls.h
makefile
runtime/hazard.c
runtime/lwt.c
runtime/mem.c
runtime/random.c [deleted file]
runtime/rcu.c
runtime/rlocal.h
runtime/runtime.c
test/map_test1.c
test/map_test2.c
test/perf_test.c
test/rcu_test.c
test/txn_test.c

index 085f6dd4031c88f67f9eb0fd8f8d65cb8d86b1bd..5c0b6a4da92e6c7c4dec6e86b7739f8b7c5398e9 100644 (file)
@@ -34,8 +34,8 @@ void lwt_set_trace_level (const char *flags);
 // the dump. It is only included when its specified category is enabled at a trace level greater than or equal to
 // the one in <flag>. Categories are case sensitive. 
 static inline void lwt_trace (const char *flag, const char *format, size_t value1, size_t value2) {
-    extern char TraceLevel[256];
-    if (EXPECT_FALSE(TraceLevel[(unsigned)flag[0]] >= flag[1])) {
+    extern char flag_state_[256];
+    if (EXPECT_FALSE(flag_state_[(unsigned)flag[0]] >= flag[1])) {
         // embed <flags> in <format> so we don't have to make the lwt_record_t any bigger than it already is
         uint64_t f = ((uint64_t)(size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48));
         extern void lwt_trace_i (uint64_t format, size_t value1, size_t value2);
index 91b2757475d0883c91c9585bb1adaa7dde730937..7d5f829a0cb11ff620b57d1457f7b8198b9de1cb 100644 (file)
@@ -4,6 +4,6 @@
  */
 #ifndef MEM_H
 #define MEM_H
-void *nbd_malloc (size_t n) __attribute__((malloc, alloc_size(1)));
+void *nbd_malloc (size_t n) __attribute__((malloc));
 void nbd_free (void *x) __attribute__((nonnull));
 #endif//MEM_H
index 4fe4cbf67558a989081f48b996e50b5b61e80912..e453fecac9b9425123b05170c7e0c10f5b22df29 100644 (file)
@@ -8,7 +8,11 @@
 #include <pthread.h>
 #include "tls.h"
 
-void nbd_thread_init (void);
+extern DECLARE_THREAD_LOCAL(tid_, int);
+
+int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg);
 uint64_t nbd_rand (void);
+uint64_t nbd_rand_seed (int i);
+int nbd_next_rand (uint64_t *r);
 
 #endif//RUNTIME_H
index c496dab9faac6e14a205e771db558ecbd55bbd34..5f3d0e17ca784c274a9d41a0cc6e94c0126e1ef3 100644 (file)
 
 #define INIT_THREAD_LOCAL(name) \
     do { \
-        if (pthread_key_create(&name##_KEY, NULL) != 0) { \
-            assert("error initializing thread local variable " #name, FALSE); \
-        } \
+        if (pthread_key_create(&name##_KEY, NULL) != 0) { assert(FALSE); } \
     } while (0)
 
-#define SET_THREAD_LOCAL(name, value) \
-    do { \
-        name = value; \
-        pthread_setspecific(name##_KEY, (void *)(size_t)value); \
-    } while (0);
+#define SET_THREAD_LOCAL(name, value) pthread_setspecific(name##_KEY, (void *)(size_t)value);
 
 #define LOCALIZE_THREAD_LOCAL(name, type) type name = (type)(size_t)pthread_getspecific(name##_KEY)
 
index d9863f2a0111bec1fc8069fcfff905bb347ac8eb..0130026c8e0a7a78abb9dfd86ecb75c9a04f1c0b 100644 (file)
--- a/makefile
+++ b/makefile
@@ -5,16 +5,15 @@
 # Makefile for building programs with whole-program interfile optimization
 ###################################################################################################
 CFLAGS0 := -Wall -Werror -std=gnu99 -lpthread #-m32 -DNBD32
-CFLAGS1 := $(CFLAGS0) -g #-DNDEBUG #-fwhole-program -combine
+CFLAGS1 := $(CFLAGS0) -g -O3 #-DNDEBUG #-fwhole-program -combine
 CFLAGS2 := $(CFLAGS1) #-DENABLE_TRACE
 CFLAGS3 := $(CFLAGS2) #-DLIST_USE_HAZARD_POINTER
 CFLAGS  := $(CFLAGS3) #-DNBD_SINGLE_THREADED #-DUSE_SYSTEM_MALLOC #-DTEST_STRING_KEYS
 INCS    := $(addprefix -I, include)
-TESTS   := output/perf_test output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
+TESTS   := output/perf_test #output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
 OBJS    := $(TESTS)
 
-RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c runtime/random.c \
-                               datatype/nstring.c #runtime/hazard.c
+RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c #runtime/hazard.c
 MAP_SRCS     := map/map.c map/list.c map/skiplist.c map/hashtable.c
 
 haz_test_SRCS  := $(RUNTIME_SRCS) test/haz_test.c
index 431c57665029b72db1d21c8fd34b90458cce3184..3ebed2ee0ca7f267a0926e5f66e662f6216a7d8c 100644 (file)
@@ -53,8 +53,8 @@ static int search_hazards (void *p, haz_t *hazards, int n) {
 
 static void resize_pending (void) {
     TRACE("H2", "haz_resize_pending", 0, 0);
-    LOCALIZE_THREAD_LOCAL(ThreadId, int);
-    haz_local_t *l = haz_local_ + ThreadId;
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
     pending_t *p = nbd_malloc(sizeof(pending_t) * l->pending_size * 2);
     memcpy(p, l->pending, l->pending_size);
     nbd_free(l->pending);
@@ -66,8 +66,8 @@ void haz_defer_free (void *d, free_t f) {
     TRACE("H1", "haz_defer_free: %p (%p)", d, f);
     assert(d);
     assert(f);
-    LOCALIZE_THREAD_LOCAL(ThreadId, int);
-    haz_local_t *l = haz_local_ + ThreadId;
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
     while (l->pending_count == l->pending_size) {
 
         if (l->pending_size == 0) {
@@ -131,17 +131,17 @@ haz_t *haz_get_static (int i) {
     TRACE("H1", "haz_get_static: %p", i, 0);
     if (i >= STATIC_HAZ_PER_THREAD)
         return NULL;
-    LOCALIZE_THREAD_LOCAL(ThreadId, int);
+    LOCALIZE_THREAD_LOCAL(tid_, int);
     assert(i < STATIC_HAZ_PER_THREAD);
-    haz_t *ret = &haz_local_[ThreadId].static_haz[i];
+    haz_t *ret = &haz_local_[tid_].static_haz[i];
     TRACE("H1", "haz_get_static: returning %p", ret, 0);
     return ret;
 }
 
 void haz_register_dynamic (haz_t *haz) {
     TRACE("H1", "haz_register_dynamic: %p", haz, 0);
-    LOCALIZE_THREAD_LOCAL(ThreadId, int);
-    haz_local_t *l = haz_local_ + ThreadId;
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
 
     if (l->dynamic_size == 0) {
         int n = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD;
@@ -163,8 +163,8 @@ void haz_register_dynamic (haz_t *haz) {
 // assumes <haz> was registered in the same thread
 void haz_unregister_dynamic (void **haz) {
     TRACE("H1", "haz_unregister_dynamic: %p", haz, 0);
-    LOCALIZE_THREAD_LOCAL(ThreadId, int);
-    haz_local_t *l = haz_local_ + ThreadId;
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    haz_local_t *l = haz_local_ + tid_;
 
     for (int i = 0; i < l->dynamic_count; ++i) {
         if (l->dynamic[i] == haz) {
index 47bfcebb53f31760924139e51ccede621e604a01..a986567bcb05023f8e6bafb541035fe5266f9900 100644 (file)
@@ -28,34 +28,35 @@ typedef struct lwt_buffer {
     lwt_record_t x[0];
 } lwt_buffer_t;
 
-lwt_buffer_t *TraceBuffer[MAX_NUM_THREADS] = {};
-char TraceLevel[256] = {};
-static const char *TraceSpec = "";
-
-void lwt_thread_init (void) {
-    int thread_index = GET_THREAD_INDEX();
-
-    if (TraceBuffer[thread_index] == NULL) {
-        TraceBuffer[thread_index] = 
-            (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
-        memset(TraceBuffer[thread_index], 0, sizeof(lwt_buffer_t));
+lwt_buffer_t *lwt_buf_[MAX_NUM_THREADS] = {};
+char flag_state_[256] = {};
+static const char *flags_ = "";
+
+void lwt_thread_init (int thread_id)
+{
+    assert(thread_id < MAX_NUM_THREADS);
+    if (lwt_buf_[thread_id] == NULL) {
+        lwt_buf_[thread_id] = (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
+        memset(lwt_buf_[thread_id], 0, sizeof(lwt_buffer_t));
     }
 }
 
-void lwt_set_trace_level (const char *flags) {
+void lwt_set_trace_level (const char *flags)
+{
     assert(strlen(flags) % 2 == 0); // a well formed <flags> should be an even number of characters long
-    TraceSpec = flags;
-    memset(TraceLevel, 0, sizeof(TraceLevel));
+    flags_ = flags;
+    memset(flag_state_, 0, sizeof(flag_state_));
     for (int i = 0; flags[i]; i+=2) {
-        TraceLevel[(unsigned)flags[i]] = flags[i+1];
+        flag_state_[(unsigned)flags[i]] = flags[i+1];
     }
 }
 
-static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) {
+static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset)
+{
     // print the record if its trace category is enabled at a high enough level
     int flag  =  r->format >> 56;
     int level = (r->format >> 48) & 0xFF;
-    if (TraceLevel[(unsigned)flag] >= level) {
+    if (flag_state_[(unsigned)flag] >= level) {
         char s[3] = {flag, level, '\0'};
         fprintf(file, "%09llu %d %s ", ((uint64_t)r->timestamp - offset) >> 5, thread_id, s);
         const char *format = (const char *)(size_t)(r->format & MASK(48)); // strip out the embedded flags
@@ -64,17 +65,18 @@ static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t of
     }
 }
 
-static void dump_buffer (FILE *file, int thread_index, uint64_t offset) {
-    lwt_buffer_t *tb = TraceBuffer[thread_index]; 
+static void dump_buffer (FILE *file, int thread_id, uint64_t offset)
+{
+    lwt_buffer_t *tb = lwt_buf_[thread_id]; 
     assert(tb);
     if (tb->head > LWT_BUFFER_SIZE) {
         for (int i = tb->head & LWT_BUFFER_MASK; i < LWT_BUFFER_SIZE; ++i) {
-            dump_record(file, thread_index + 1, tb->x + i, offset);
+            dump_record(file, thread_id, tb->x + i, offset);
         }
     }
 
     for (int i = 0; i < (tb->head & LWT_BUFFER_MASK); ++i) {
-        dump_record(file, thread_index + 1, tb->x + i, offset);
+        dump_record(file, thread_id, tb->x + i, offset);
     }
 }
 
@@ -82,19 +84,20 @@ void lwt_halt (void) {
     halt_ = 1;
 }
 
-void lwt_dump (const char *file_name) {
+void lwt_dump (const char *file_name)
+{
     halt_ = 1;
     uint64_t offset = (uint64_t)-1;
 
     for (int i = 0; i < MAX_NUM_THREADS; ++i) {
-        if (TraceBuffer[i] != NULL && TraceBuffer[i]->head != 0) {
-            uint64_t x = TraceBuffer[i]->x[0].timestamp;
+        if (lwt_buf_[i] != NULL && lwt_buf_[i]->head != 0) {
+            uint64_t x = lwt_buf_[i]->x[0].timestamp;
             if (x < offset) {
                 offset = x;
             }
-            if (TraceBuffer[i]->head > LWT_BUFFER_SIZE)
+            if (lwt_buf_[i]->head > LWT_BUFFER_SIZE)
             {
-                x = TraceBuffer[i]->x[TraceBuffer[i]->head & LWT_BUFFER_MASK].timestamp;
+                x = lwt_buf_[i]->x[lwt_buf_[i]->head & LWT_BUFFER_MASK].timestamp;
                 if (x < offset) {
                     offset = x;
                 }
@@ -106,7 +109,7 @@ void lwt_dump (const char *file_name) {
         FILE *file = fopen(file_name, "w");
         assert(file);
         for (int i = 0; i < MAX_NUM_THREADS; ++i) {
-            if (TraceBuffer[i] != NULL) {
+            if (lwt_buf_[i] != NULL) {
                 dump_buffer(file, i, offset);
             }
         }
@@ -117,7 +120,8 @@ void lwt_dump (const char *file_name) {
 
 void lwt_trace_i (uint64_t format, size_t value1, size_t value2) {
     while (halt_) {}
-    lwt_buffer_t *tb = TraceBuffer[GET_THREAD_INDEX()];
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    lwt_buffer_t *tb = lwt_buf_[tid_];
     if (tb != NULL) {
         unsigned int u, l;
         __asm__ __volatile__("rdtsc" : "=a" (l), "=d" (u)); 
index 2f55ff4d2108d93522d878e4549ef5812cc10721..90a22e1f90b57e3d8b31d17618be2a3b95647258 100644 (file)
@@ -74,9 +74,9 @@ static inline header_t *get_header (void *r) {
 }
 
 static void *get_new_region (int block_scale) {
-    int thread_index = GET_THREAD_INDEX();
+    LOCALIZE_THREAD_LOCAL(tid_, int);
 #ifdef RECYCLE_PAGES
-    tl_t *tl = &tl_[thread_index]; // thread-local data
+    tl_t *tl = &tl_[tid_]; // thread-local data
     if (block_scale <= PAGE_SCALE && tl->free_pages != NULL) {
         void *region = tl->free_pages;
         tl->free_pages = tl->free_pages->next;
@@ -122,7 +122,7 @@ static void *get_new_region (int block_scale) {
     TRACE("m1", "get_new_region: header %p (%p)", h, h - headers_);
     assert(h->scale == 0);
     h->scale = block_scale;
-    h->owner = thread_index;
+    h->owner = tid_;
 
     return region;
 }
@@ -148,6 +148,7 @@ void mem_init (void) {
 void nbd_free (void *x) {
     TRACE("m1", "nbd_free: block %p page %p", x, (size_t)x & ~MASK(PAGE_SCALE));
     ASSERT(x);
+    LOCALIZE_THREAD_LOCAL(tid_, int);
     block_t  *b = (block_t *)x;
     header_t *h = get_header(x);
     int b_scale = h->scale;
@@ -163,9 +164,8 @@ void nbd_free (void *x) {
 #ifndef NDEBUG
     memset(b, 0xcd, (1ULL << b_scale)); // bear trap
 #endif
-    int thread_index = GET_THREAD_INDEX();
-    tl_t *tl = &tl_[thread_index]; // thread-local data
-    if (h->owner == thread_index) {
+    tl_t *tl = &tl_[tid_]; // thread-local data
+    if (h->owner == tid_) {
         TRACE("m1", "nbd_free: private block, old free list head %p", tl->free_list[b_scale], 0);
 
 #ifndef RECYCLE_PAGES
@@ -264,7 +264,8 @@ void *nbd_malloc (size_t n) {
     if (EXPECT_FALSE(b_scale < MIN_SCALE)) { b_scale = MIN_SCALE; }
     if (EXPECT_FALSE(b_scale > MAX_SCALE)) { return NULL; }
 
-    tl_t *tl = &tl_[GET_THREAD_INDEX()]; // thread-local data
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    tl_t *tl = &tl_[tid_]; // thread-local data
 
     block_t *b = pop_free_list(tl, b_scale);
     if (b != NULL) {
diff --git a/runtime/random.c b/runtime/random.c
deleted file mode 100644 (file)
index d6d1cf0..0000000
+++ /dev/null
@@ -1,79 +0,0 @@
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <pthread.h>
-#include <unistd.h>
-#include <fcntl.h>
-
-#include "common.h"
-#include "runtime.h"
-
-DECLARE_THREAD_LOCAL(rx_, uint32_t);
-DECLARE_THREAD_LOCAL(ry_, uint32_t);
-DECLARE_THREAD_LOCAL(rz_, uint32_t);
-DECLARE_THREAD_LOCAL(rc_, uint32_t);
-
-void rnd_init (void) {
-    INIT_THREAD_LOCAL(rx_);
-    INIT_THREAD_LOCAL(ry_);
-    INIT_THREAD_LOCAL(rz_);
-    INIT_THREAD_LOCAL(rc_);
-}
-
-// TODO: put a lock around this so that multiple threads being initialize concurrently don't read
-//       the same values from /dev/urandom
-void rnd_thread_init (void) {
-    int fd = open("/dev/urandom", O_RDONLY);
-    if (fd == -1) {
-        perror("Error opening /dev/urandom");
-        exit(1);
-    }
-
-    char buf[16];
-
-    int n = read(fd, buf, sizeof(buf));
-    if (n != 16) {
-        if (n == -1) {
-            perror("Error reading from /dev/urandom");
-        }
-        fprintf(stderr, "Could not read enough bytes from /dev/urandom");
-        exit(1);
-    }
-
-    uint32_t x, y, z, c;
-    memcpy(&x, buf +  0, 4);
-    memcpy(&y, buf +  4, 4);
-    memcpy(&z, buf +  8, 4);
-    memcpy(&c, buf + 12, 4);
-
-    SET_THREAD_LOCAL(rx_, x);
-    SET_THREAD_LOCAL(ry_, y);
-    SET_THREAD_LOCAL(rz_, z);
-    SET_THREAD_LOCAL(rc_, z);
-}
-
-// George Marsaglia's KISS generator
-//
-// Even though this returns 64 bits, this algorithm was only designed to generate 32 bits.
-// The upper 32 bits is going to be highly correlated with the lower 32 bits of the next call.
-uint64_t nbd_rand (void) {
-    LOCALIZE_THREAD_LOCAL(rx_, unsigned);
-    LOCALIZE_THREAD_LOCAL(ry_, unsigned);
-    LOCALIZE_THREAD_LOCAL(rz_, unsigned);
-    LOCALIZE_THREAD_LOCAL(rc_, unsigned);
-
-    uint32_t rx = 69069 * rx_ + 12345;
-    uint32_t ry = ry_;
-    ry ^= (ry << 13);
-    ry ^= (ry >> 17);
-    ry ^= (ry <<  5);
-    uint64_t t = rz_ * 698769069LL + rc_;
-    uint64_t r = rx + ry + t;
-
-    SET_THREAD_LOCAL(rx_, rx);
-    SET_THREAD_LOCAL(ry_, ry);
-    SET_THREAD_LOCAL(rz_, t);
-    SET_THREAD_LOCAL(rc_, t >> 32);
-
-    return r;
-}
index 0d65a37f6dda8bca88b9e40e634a663b7637f776..12c37a3d4e37df2d5a340010ed039cde7efc1b3c 100644 (file)
@@ -39,35 +39,36 @@ static fifo_t *fifo_alloc(int scale) {
     return q;
 }
 
-void rcu_thread_init (void) {
-    int thread_index = GET_THREAD_INDEX();
-    if (pending_[thread_index] == NULL) {
-        pending_[thread_index] = fifo_alloc(RCU_QUEUE_SCALE);
+void rcu_thread_init (int id) {
+    assert(id < MAX_NUM_THREADS);
+    if (pending_[id] == NULL) {
+        pending_[id] = fifo_alloc(RCU_QUEUE_SCALE);
         (void)SYNC_ADD(&num_threads_, 1);
     }
 }
 
 void rcu_update (void) {
-    int thread_index = GET_THREAD_INDEX();
-    int next_thread_index = (thread_index + 1) % num_threads_;
-    TRACE("r1", "rcu_update: updating thread %llu", next_thread_index, 0);
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    assert(tid_ < num_threads_);
+    int next_thread_id = (tid_ + 1) % num_threads_;
+    TRACE("r1", "rcu_update: updating thread %llu", next_thread_id, 0);
     int i;
     for (i = 0; i < num_threads_; ++i) {
-        if (i == thread_index)
+        if (i == tid_)
             continue;
 
         // No need to post an update if the value hasn't changed
-        if (rcu_[thread_index][i] == rcu_last_posted_[thread_index][i])
+        if (rcu_[tid_][i] == rcu_last_posted_[tid_][i])
             continue;
 
-        uint64_t x = rcu_[thread_index][i];
-        rcu_[next_thread_index][i] = rcu_last_posted_[thread_index][i] = x;
+        uint64_t x = rcu_[tid_][i];
+        rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x;
         TRACE("r2", "rcu_update: posted updated value (%llu) for thread %llu", x, i);
     }
 
     // free
-    fifo_t *q = pending_[thread_index];
-    while (q->tail != rcu_[thread_index][thread_index]) {
+    fifo_t *q = pending_[tid_];
+    while (q->tail != rcu_[tid_][tid_]) {
         uint32_t i = MOD_SCALE(q->tail, q->scale);
         TRACE("r0", "rcu_update: freeing %p from queue at position %llu", q->x[i], q->tail);
         nbd_free(q->x[i]);
@@ -77,18 +78,17 @@ void rcu_update (void) {
 
 void rcu_defer_free (void *x) {
     assert(x);
-    int thread_index = GET_THREAD_INDEX();
-    fifo_t *q = pending_[thread_index];
+    LOCALIZE_THREAD_LOCAL(tid_, int);
+    fifo_t *q = pending_[tid_];
     assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale));
     uint32_t i = MOD_SCALE(q->head, q->scale);
     q->x[i] = x;
     TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, q->head);
     q->head++;
 
-    if (pending_[thread_index]->head - rcu_last_posted_[thread_index][thread_index] >= RCU_POST_THRESHOLD) {
-        TRACE("r0", "rcu_defer_free: posting %llu", pending_[thread_index]->head, 0);
-        int next_thread_index = (thread_index + 1) % num_threads_;
-        rcu_[next_thread_index][thread_index] = pending_[thread_index]->head;
-        rcu_last_posted_[thread_index][thread_index] = pending_[thread_index]->head;
+    if (pending_[tid_]->head - rcu_last_posted_[tid_][tid_] >= RCU_POST_THRESHOLD) {
+        TRACE("r0", "rcu_defer_free: posting %llu", pending_[tid_]->head, 0);
+        int next_thread_id = (tid_ + 1) % num_threads_;
+        rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
     }
 }
index fef861f423a408633e1daf804d739d45525d9110..bb62c76e260b98cf3b43b7a9834cfe69b12a653a 100644 (file)
@@ -4,15 +4,9 @@
 #include "runtime.h"
 #include "tls.h"
 
-extern DECLARE_THREAD_LOCAL(ThreadId, int);
-
-#define GET_THREAD_INDEX() ({ LOCALIZE_THREAD_LOCAL(ThreadId, int); assert(ThreadId != 0); ThreadId - 1; })
-
 void mem_init (void);
-void rnd_init (void);
 
-void rnd_thread_init (void);
-void rcu_thread_init (void);
-void lwt_thread_init (void);
+void rcu_thread_init (int thread_id);
+void lwt_thread_init (int thread_id);
 
 #endif//RLOCAL_H 
index 19024e2f389f237ecbd6f0e2edf906b0b198a1d2..5f6b8d94d0965f85c3fa5c2166b3bd66afd515f1 100644 (file)
@@ -2,6 +2,7 @@
  * Written by Josh Dybnis and released to the public domain, as explained at
  * http://creativecommons.org/licenses/publicdomain
  */
+#define _POSIX_C_SOURCE 1 // for rand_r()
 #include <stdlib.h>
 #include <pthread.h>
 #include "common.h"
 #include "mem.h"
 #include "tls.h"
 
-DECLARE_THREAD_LOCAL(ThreadId, int);
-static int ThreadIndex
+DECLARE_THREAD_LOCAL(tid_, int);
+DECLARE_THREAD_LOCAL(rx_, uint32_t);
+DECLARE_THREAD_LOCAL(ry_, uint32_t);
+DECLARE_THREAD_LOCAL(rz_, uint32_t);
+DECLARE_THREAD_LOCAL(rc_, uint32_t);
 
-static int MaxThreadId = 0;
+typedef struct thread_info {
+    int thread_id;
+    void *(*start_routine)(void *);
+    void *restrict arg;
+} thread_info_t;
 
 __attribute__ ((constructor)) void nbd_init (void) {
-    rnd_init();
+    //    INIT_THREAD_LOCAL(r);
+    INIT_THREAD_LOCAL(tid_);
+    SET_THREAD_LOCAL(tid_, 0);
     mem_init();
+    lwt_thread_init(0);
+    rcu_thread_init(0);
+    srand((uint32_t)rdtsc());
 }
 
-void nbd_thread_init (void) {
-    LOCALIZE_THREAD_LOCAL(ThreadId, int);
+static void *worker (void *arg) {
+    thread_info_t *ti = (thread_info_t *)arg;
+    SET_THREAD_LOCAL(tid_, ti->thread_id);
+    //LOCALIZE_THREAD_LOCAL(tid_, int);
 
-    if (ThreadId == 0) {
-        ++MaxThreadId; // TODO: reuse thread id's of threads that have been destroyed
-        ASSERT(MaxThreadId <= MAX_NUM_THREADS);
-        SET_THREAD_LOCAL(ThreadId, MaxThreadId);
-        rnd_thread_init();
-    } 
+    SET_THREAD_LOCAL(rx_, rand());
+    SET_THREAD_LOCAL(ry_, rand());
+    SET_THREAD_LOCAL(rz_, rand());
+    SET_THREAD_LOCAL(rc_, rand());
 
-    lwt_thread_init();
-    rcu_thread_init();
+    lwt_thread_init(ti->thread_id);
+    rcu_thread_init(ti->thread_id);
+
+    void *ret = ti->start_routine(ti->arg);
+    nbd_free(ti);
+    return ret;
+}
+
+int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg) {
+    thread_info_t *ti = (thread_info_t *)nbd_malloc(sizeof(thread_info_t));
+    ti->thread_id = thread_id;
+    ti->start_routine = start_routine;
+    ti->arg = arg;
+    return pthread_create(thread, NULL, worker, ti);
+}
+
+// George Marsaglia's KISS generator
+uint64_t nbd_rand (void) {
+    LOCALIZE_THREAD_LOCAL(rx_, unsigned);
+    LOCALIZE_THREAD_LOCAL(ry_, unsigned);
+    LOCALIZE_THREAD_LOCAL(rz_, unsigned);
+    LOCALIZE_THREAD_LOCAL(rc_, unsigned);
+
+    uint32_t rx = 69069 * rx_ + 12345;
+    uint32_t ry = ry_;
+    uint32_t rz = rz_;
+    ry ^= (ry << 13);
+    ry ^= (ry >> 17);
+    ry ^= (ry <<  5);
+    uint64_t t = rz * 698769069LL + rc_;
+    uint64_t r = rx + ry + (rz = t);
+
+    SET_THREAD_LOCAL(rx_, rx);
+    SET_THREAD_LOCAL(ry_, ry);
+    SET_THREAD_LOCAL(rz_, rz);
+    SET_THREAD_LOCAL(rc_, (unsigned)(t >> 32));
+
+    return r;
+}
+
+// Fairly fast random numbers
+uint64_t nbd_rand_seed (int i) {
+    return rdtsc() + -715159705 + i * 129;
+}
+
+int nbd_next_rand (uint64_t *r) {
+    *r = (*r * 0x5DEECE66DLL + 0xBLL) & MASK(48);
+    return (*r >> 17) & 0x7FFFFFFF;
 }
index 11fa5d45aba7a823911e3c15070585b6ef308fe6..f05fa3dea001871492aeb8e75ba166c221c11037 100644 (file)
@@ -21,7 +21,6 @@ static long num_threads_;
 static map_t *map_;
 
 void *worker (void *arg) {
-    nbd_thread_init();
 
     // Wait for all the worker threads to be ready.
     (void)SYNC_ADD(&wait_, -1);
@@ -57,7 +56,6 @@ void *worker (void *arg) {
 }
 
 int main (int argc, char **argv) {
-    nbd_thread_init();
     lwt_set_trace_level("r0m3s3");
 
     char* program_name = argv[0];
@@ -101,7 +99,7 @@ int main (int argc, char **argv) {
         wait_ = num_threads_;
 
         for (int i = 0; i < num_threads_; ++i) {
-            int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
+            int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
             if (rc != 0) { perror("pthread_create"); return rc; }
         }
 
@@ -111,7 +109,7 @@ int main (int argc, char **argv) {
 
         gettimeofday(&tv2, NULL);
         int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
-        map_print(map_, FALSE);
+        map_print(map_);
         printf("Th:%ld Time:%dms\n\n", num_threads_, ms);
         fflush(stdout);
     }
index f8e0abd85b7f3958df026007b017c78db3d40729..52438ea763364f5384b86aaf189cf0d87d6ff005 100644 (file)
@@ -132,8 +132,6 @@ void basic_test (CuTest* tc) {
 }
 
 void *add_remove_worker (void *arg) {
-    nbd_thread_init();
-
     worker_data_t *wd = (worker_data_t *)arg;
     map_t *map = wd->map;
     CuTest* tc = wd->tc;
@@ -199,7 +197,7 @@ void concurrent_add_remove_test (CuTest* tc) {
         wd[i].tc = tc;
         wd[i].map = map;
         wd[i].wait = &wait;
-        int rc = pthread_create(thread + i, NULL, add_remove_worker, wd + i);
+        int rc = nbd_thread_create(thread + i, i, add_remove_worker, wd + i);
         if (rc != 0) { perror("nbd_thread_create"); return; }
     }
 
@@ -209,7 +207,7 @@ void concurrent_add_remove_test (CuTest* tc) {
 
     gettimeofday(&tv2, NULL);
     int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
-    map_print(map, FALSE);
+    map_print(map);
     printf("Time:%dms\n", ms);
     fflush(stdout);
 
@@ -317,7 +315,6 @@ void big_iteration_test (CuTest* tc) {
 }
 
 int main (void) {
-    nbd_thread_init();
     lwt_set_trace_level("r0m3l2t0");
 
     static const map_impl_t *map_types[] = { &MAP_IMPL_LL, &MAP_IMPL_SL, &MAP_IMPL_HT };
index 088956efefbbd115bca68532671d7398ed09bca0..1ea0bcb3eb11f831c9ca50991213121b0e659cf8 100644 (file)
@@ -27,7 +27,7 @@ static int duration_;
 #define OP_SELECT_RANGE (1ULL << 20)
 
 void *worker (void *arg) {
-    nbd_thread_init();
+    volatile uint64_t ops = 0;
 
     // Wait for all the worker threads to be ready.
     (void)SYNC_ADD(&load_, -1);
@@ -44,7 +44,6 @@ void *worker (void *arg) {
     (void)SYNC_ADD(&start_, -1);
     do {} while (start_);
 
-    uint64_t ops = 0;
     while (!stop_) {
         ++ops;
         map_key_t key = (nbd_rand() & (num_keys_ - 1)) + 1;
@@ -78,7 +77,7 @@ uint64_t run_test (void) {
 
     pthread_t thread[MAX_NUM_THREADS];
     for (int i = 0; i < num_threads_; ++i) {
-        int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
+        int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
         if (rc != 0) { perror("pthread_create"); exit(rc); }
     }
 
@@ -153,7 +152,6 @@ int main (int argc, char **argv) {
     get_range_ = (int)((double)OP_SELECT_RANGE / 100 * read_ratio);
     put_range_ = get_range_ + (int)(((double)OP_SELECT_RANGE - get_range_) / 100 * put_ratio);
 
-    nbd_thread_init();
     static const map_impl_t *map_types[] = { &MAP_IMPL_HT };
     for (int i = 0; i < sizeof(map_types)/sizeof(*map_types); ++i) {
 #ifdef TEST_STRING_KEYS
index b2258554ad037684494509a3768df61581f73fb3..b2dedaae418d974406b0697cbdef7457efb107c5 100644 (file)
@@ -1,7 +1,7 @@
+#define _POSIX_C_SOURCE 1 // for rand_r
 #include <stdio.h>
 #include <errno.h>
 #include <pthread.h>
-#include <unistd.h>
 #include <sys/time.h>
 #include "common.h"
 #include "runtime.h"
@@ -53,7 +53,8 @@ node_t *node_alloc (void) {
 }
 
 void *worker (void *arg) {
-    nbd_thread_init();
+    int id = (int)(size_t)arg;
+    unsigned int rand_seed = (unsigned int)id + 1;
 
     // Wait for all the worker threads to be ready.
     (void)__sync_fetch_and_add(&wait_, -1);
@@ -61,7 +62,7 @@ void *worker (void *arg) {
 
     int i;
     for (i = 0; i < NUM_ITERATIONS; ++ i) {
-        int n = nbd_rand();
+        int n = rand_r(&rand_seed);
         if (n & 0x1) {
             lifo_aba_push(stk_, node_alloc());
         } else {
@@ -77,10 +78,9 @@ void *worker (void *arg) {
 }
 
 int main (int argc, char **argv) {
-    nbd_thread_init();
     lwt_set_trace_level("m3r3");
 
-    int num_threads = sysconf(_SC_NPROCESSORS_CONF);
+    int num_threads = MAX_NUM_THREADS;
     if (argc == 2)
     {
         errno = 0;
@@ -104,7 +104,7 @@ int main (int argc, char **argv) {
 
     pthread_t thread[num_threads];
     for (int i = 0; i < num_threads; ++i) {
-        int rc = pthread_create(thread + i, NULL, worker, (void *)(size_t)i);
+        int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i);
         if (rc != 0) { perror("pthread_create"); return rc; }
     }
     for (int i = 0; i < num_threads; ++i) {
index 4728ef971ec3553584efad4fd0e62d550ed64c19..0a5045d753f8e45654963437423e92f479b109e3 100644 (file)
@@ -25,7 +25,7 @@ void test1 (CuTest* tc) {
 }
 
 int main (void) {
-    nbd_thread_init();
+
     lwt_set_trace_level("x3h3");
 
     CuString *output = CuStringNew();