From: jdybnis Date: Sun, 10 May 2009 02:11:03 +0000 (+0000) Subject: work in progress X-Git-Url: https://pd.if.org/git/?p=nbds;a=commitdiff_plain work in progress Warning: does not compile --- diff --git a/include/lwt.h b/include/lwt.h index 5c0b6a4..085f6dd 100644 --- a/include/lwt.h +++ b/include/lwt.h @@ -34,8 +34,8 @@ void lwt_set_trace_level (const char *flags); // the dump. It is only included when its specified category is enabled at a trace level greater than or equal to // the one in . Categories are case sensitive. static inline void lwt_trace (const char *flag, const char *format, size_t value1, size_t value2) { - extern char flag_state_[256]; - if (EXPECT_FALSE(flag_state_[(unsigned)flag[0]] >= flag[1])) { + extern char TraceLevel[256]; + if (EXPECT_FALSE(TraceLevel[(unsigned)flag[0]] >= flag[1])) { // embed in so we don't have to make the lwt_record_t any bigger than it already is uint64_t f = ((uint64_t)(size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48)); extern void lwt_trace_i (uint64_t format, size_t value1, size_t value2); diff --git a/include/runtime.h b/include/runtime.h index e453fec..4fe4cbf 100644 --- a/include/runtime.h +++ b/include/runtime.h @@ -8,11 +8,7 @@ #include #include "tls.h" -extern DECLARE_THREAD_LOCAL(tid_, int); - -int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg); +void nbd_thread_init (void); uint64_t nbd_rand (void); -uint64_t nbd_rand_seed (int i); -int nbd_next_rand (uint64_t *r); #endif//RUNTIME_H diff --git a/include/tls.h b/include/tls.h index 5f3d0e1..c496dab 100644 --- a/include/tls.h +++ b/include/tls.h @@ -22,10 +22,16 @@ #define INIT_THREAD_LOCAL(name) \ do { \ - if (pthread_key_create(&name##_KEY, NULL) != 0) { assert(FALSE); } \ + if (pthread_key_create(&name##_KEY, NULL) != 0) { \ + assert("error initializing thread local variable " #name, FALSE); \ + } \ } while (0) -#define SET_THREAD_LOCAL(name, value) pthread_setspecific(name##_KEY, (void *)(size_t)value); +#define SET_THREAD_LOCAL(name, value) \ + do { \ + name = value; \ + pthread_setspecific(name##_KEY, (void *)(size_t)value); \ + } while (0); #define LOCALIZE_THREAD_LOCAL(name, type) type name = (type)(size_t)pthread_getspecific(name##_KEY) diff --git a/makefile b/makefile index 0130026..d9863f2 100644 --- a/makefile +++ b/makefile @@ -5,15 +5,16 @@ # Makefile for building programs with whole-program interfile optimization ################################################################################################### CFLAGS0 := -Wall -Werror -std=gnu99 -lpthread #-m32 -DNBD32 -CFLAGS1 := $(CFLAGS0) -g -O3 #-DNDEBUG #-fwhole-program -combine +CFLAGS1 := $(CFLAGS0) -g #-DNDEBUG #-fwhole-program -combine CFLAGS2 := $(CFLAGS1) #-DENABLE_TRACE CFLAGS3 := $(CFLAGS2) #-DLIST_USE_HAZARD_POINTER CFLAGS := $(CFLAGS3) #-DNBD_SINGLE_THREADED #-DUSE_SYSTEM_MALLOC #-DTEST_STRING_KEYS INCS := $(addprefix -I, include) -TESTS := output/perf_test #output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test +TESTS := output/perf_test output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test OBJS := $(TESTS) -RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c #runtime/hazard.c +RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c runtime/random.c \ + datatype/nstring.c #runtime/hazard.c MAP_SRCS := map/map.c map/list.c map/skiplist.c map/hashtable.c haz_test_SRCS := $(RUNTIME_SRCS) test/haz_test.c diff --git a/runtime/hazard.c b/runtime/hazard.c index 3ebed2e..431c576 100644 --- a/runtime/hazard.c +++ b/runtime/hazard.c @@ -53,8 +53,8 @@ static int search_hazards (void *p, haz_t *hazards, int n) { static void resize_pending (void) { TRACE("H2", "haz_resize_pending", 0, 0); - LOCALIZE_THREAD_LOCAL(tid_, int); - haz_local_t *l = haz_local_ + tid_; + LOCALIZE_THREAD_LOCAL(ThreadId, int); + haz_local_t *l = haz_local_ + ThreadId; pending_t *p = nbd_malloc(sizeof(pending_t) * l->pending_size * 2); memcpy(p, l->pending, l->pending_size); nbd_free(l->pending); @@ -66,8 +66,8 @@ void haz_defer_free (void *d, free_t f) { TRACE("H1", "haz_defer_free: %p (%p)", d, f); assert(d); assert(f); - LOCALIZE_THREAD_LOCAL(tid_, int); - haz_local_t *l = haz_local_ + tid_; + LOCALIZE_THREAD_LOCAL(ThreadId, int); + haz_local_t *l = haz_local_ + ThreadId; while (l->pending_count == l->pending_size) { if (l->pending_size == 0) { @@ -131,17 +131,17 @@ haz_t *haz_get_static (int i) { TRACE("H1", "haz_get_static: %p", i, 0); if (i >= STATIC_HAZ_PER_THREAD) return NULL; - LOCALIZE_THREAD_LOCAL(tid_, int); + LOCALIZE_THREAD_LOCAL(ThreadId, int); assert(i < STATIC_HAZ_PER_THREAD); - haz_t *ret = &haz_local_[tid_].static_haz[i]; + haz_t *ret = &haz_local_[ThreadId].static_haz[i]; TRACE("H1", "haz_get_static: returning %p", ret, 0); return ret; } void haz_register_dynamic (haz_t *haz) { TRACE("H1", "haz_register_dynamic: %p", haz, 0); - LOCALIZE_THREAD_LOCAL(tid_, int); - haz_local_t *l = haz_local_ + tid_; + LOCALIZE_THREAD_LOCAL(ThreadId, int); + haz_local_t *l = haz_local_ + ThreadId; if (l->dynamic_size == 0) { int n = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD; @@ -163,8 +163,8 @@ void haz_register_dynamic (haz_t *haz) { // assumes was registered in the same thread void haz_unregister_dynamic (void **haz) { TRACE("H1", "haz_unregister_dynamic: %p", haz, 0); - LOCALIZE_THREAD_LOCAL(tid_, int); - haz_local_t *l = haz_local_ + tid_; + LOCALIZE_THREAD_LOCAL(ThreadId, int); + haz_local_t *l = haz_local_ + ThreadId; for (int i = 0; i < l->dynamic_count; ++i) { if (l->dynamic[i] == haz) { diff --git a/runtime/lwt.c b/runtime/lwt.c index a986567..47bfceb 100644 --- a/runtime/lwt.c +++ b/runtime/lwt.c @@ -28,35 +28,34 @@ typedef struct lwt_buffer { lwt_record_t x[0]; } lwt_buffer_t; -lwt_buffer_t *lwt_buf_[MAX_NUM_THREADS] = {}; -char flag_state_[256] = {}; -static const char *flags_ = ""; - -void lwt_thread_init (int thread_id) -{ - assert(thread_id < MAX_NUM_THREADS); - if (lwt_buf_[thread_id] == NULL) { - lwt_buf_[thread_id] = (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE); - memset(lwt_buf_[thread_id], 0, sizeof(lwt_buffer_t)); +lwt_buffer_t *TraceBuffer[MAX_NUM_THREADS] = {}; +char TraceLevel[256] = {}; +static const char *TraceSpec = ""; + +void lwt_thread_init (void) { + int thread_index = GET_THREAD_INDEX(); + + if (TraceBuffer[thread_index] == NULL) { + TraceBuffer[thread_index] = + (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE); + memset(TraceBuffer[thread_index], 0, sizeof(lwt_buffer_t)); } } -void lwt_set_trace_level (const char *flags) -{ +void lwt_set_trace_level (const char *flags) { assert(strlen(flags) % 2 == 0); // a well formed should be an even number of characters long - flags_ = flags; - memset(flag_state_, 0, sizeof(flag_state_)); + TraceSpec = flags; + memset(TraceLevel, 0, sizeof(TraceLevel)); for (int i = 0; flags[i]; i+=2) { - flag_state_[(unsigned)flags[i]] = flags[i+1]; + TraceLevel[(unsigned)flags[i]] = flags[i+1]; } } -static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) -{ +static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) { // print the record if its trace category is enabled at a high enough level int flag = r->format >> 56; int level = (r->format >> 48) & 0xFF; - if (flag_state_[(unsigned)flag] >= level) { + if (TraceLevel[(unsigned)flag] >= level) { char s[3] = {flag, level, '\0'}; fprintf(file, "%09llu %d %s ", ((uint64_t)r->timestamp - offset) >> 5, thread_id, s); const char *format = (const char *)(size_t)(r->format & MASK(48)); // strip out the embedded flags @@ -65,18 +64,17 @@ static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t of } } -static void dump_buffer (FILE *file, int thread_id, uint64_t offset) -{ - lwt_buffer_t *tb = lwt_buf_[thread_id]; +static void dump_buffer (FILE *file, int thread_index, uint64_t offset) { + lwt_buffer_t *tb = TraceBuffer[thread_index]; assert(tb); if (tb->head > LWT_BUFFER_SIZE) { for (int i = tb->head & LWT_BUFFER_MASK; i < LWT_BUFFER_SIZE; ++i) { - dump_record(file, thread_id, tb->x + i, offset); + dump_record(file, thread_index + 1, tb->x + i, offset); } } for (int i = 0; i < (tb->head & LWT_BUFFER_MASK); ++i) { - dump_record(file, thread_id, tb->x + i, offset); + dump_record(file, thread_index + 1, tb->x + i, offset); } } @@ -84,20 +82,19 @@ void lwt_halt (void) { halt_ = 1; } -void lwt_dump (const char *file_name) -{ +void lwt_dump (const char *file_name) { halt_ = 1; uint64_t offset = (uint64_t)-1; for (int i = 0; i < MAX_NUM_THREADS; ++i) { - if (lwt_buf_[i] != NULL && lwt_buf_[i]->head != 0) { - uint64_t x = lwt_buf_[i]->x[0].timestamp; + if (TraceBuffer[i] != NULL && TraceBuffer[i]->head != 0) { + uint64_t x = TraceBuffer[i]->x[0].timestamp; if (x < offset) { offset = x; } - if (lwt_buf_[i]->head > LWT_BUFFER_SIZE) + if (TraceBuffer[i]->head > LWT_BUFFER_SIZE) { - x = lwt_buf_[i]->x[lwt_buf_[i]->head & LWT_BUFFER_MASK].timestamp; + x = TraceBuffer[i]->x[TraceBuffer[i]->head & LWT_BUFFER_MASK].timestamp; if (x < offset) { offset = x; } @@ -109,7 +106,7 @@ void lwt_dump (const char *file_name) FILE *file = fopen(file_name, "w"); assert(file); for (int i = 0; i < MAX_NUM_THREADS; ++i) { - if (lwt_buf_[i] != NULL) { + if (TraceBuffer[i] != NULL) { dump_buffer(file, i, offset); } } @@ -120,8 +117,7 @@ void lwt_dump (const char *file_name) void lwt_trace_i (uint64_t format, size_t value1, size_t value2) { while (halt_) {} - LOCALIZE_THREAD_LOCAL(tid_, int); - lwt_buffer_t *tb = lwt_buf_[tid_]; + lwt_buffer_t *tb = TraceBuffer[GET_THREAD_INDEX()]; if (tb != NULL) { unsigned int u, l; __asm__ __volatile__("rdtsc" : "=a" (l), "=d" (u)); diff --git a/runtime/mem.c b/runtime/mem.c index 90a22e1..2f55ff4 100644 --- a/runtime/mem.c +++ b/runtime/mem.c @@ -74,9 +74,9 @@ static inline header_t *get_header (void *r) { } static void *get_new_region (int block_scale) { - LOCALIZE_THREAD_LOCAL(tid_, int); + int thread_index = GET_THREAD_INDEX(); #ifdef RECYCLE_PAGES - tl_t *tl = &tl_[tid_]; // thread-local data + tl_t *tl = &tl_[thread_index]; // thread-local data if (block_scale <= PAGE_SCALE && tl->free_pages != NULL) { void *region = tl->free_pages; tl->free_pages = tl->free_pages->next; @@ -122,7 +122,7 @@ static void *get_new_region (int block_scale) { TRACE("m1", "get_new_region: header %p (%p)", h, h - headers_); assert(h->scale == 0); h->scale = block_scale; - h->owner = tid_; + h->owner = thread_index; return region; } @@ -148,7 +148,6 @@ void mem_init (void) { void nbd_free (void *x) { TRACE("m1", "nbd_free: block %p page %p", x, (size_t)x & ~MASK(PAGE_SCALE)); ASSERT(x); - LOCALIZE_THREAD_LOCAL(tid_, int); block_t *b = (block_t *)x; header_t *h = get_header(x); int b_scale = h->scale; @@ -164,8 +163,9 @@ void nbd_free (void *x) { #ifndef NDEBUG memset(b, 0xcd, (1ULL << b_scale)); // bear trap #endif - tl_t *tl = &tl_[tid_]; // thread-local data - if (h->owner == tid_) { + int thread_index = GET_THREAD_INDEX(); + tl_t *tl = &tl_[thread_index]; // thread-local data + if (h->owner == thread_index) { TRACE("m1", "nbd_free: private block, old free list head %p", tl->free_list[b_scale], 0); #ifndef RECYCLE_PAGES @@ -264,8 +264,7 @@ void *nbd_malloc (size_t n) { if (EXPECT_FALSE(b_scale < MIN_SCALE)) { b_scale = MIN_SCALE; } if (EXPECT_FALSE(b_scale > MAX_SCALE)) { return NULL; } - LOCALIZE_THREAD_LOCAL(tid_, int); - tl_t *tl = &tl_[tid_]; // thread-local data + tl_t *tl = &tl_[GET_THREAD_INDEX()]; // thread-local data block_t *b = pop_free_list(tl, b_scale); if (b != NULL) { diff --git a/runtime/random.c b/runtime/random.c new file mode 100644 index 0000000..d6d1cf0 --- /dev/null +++ b/runtime/random.c @@ -0,0 +1,79 @@ +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "runtime.h" + +DECLARE_THREAD_LOCAL(rx_, uint32_t); +DECLARE_THREAD_LOCAL(ry_, uint32_t); +DECLARE_THREAD_LOCAL(rz_, uint32_t); +DECLARE_THREAD_LOCAL(rc_, uint32_t); + +void rnd_init (void) { + INIT_THREAD_LOCAL(rx_); + INIT_THREAD_LOCAL(ry_); + INIT_THREAD_LOCAL(rz_); + INIT_THREAD_LOCAL(rc_); +} + +// TODO: put a lock around this so that multiple threads being initialize concurrently don't read +// the same values from /dev/urandom +void rnd_thread_init (void) { + int fd = open("/dev/urandom", O_RDONLY); + if (fd == -1) { + perror("Error opening /dev/urandom"); + exit(1); + } + + char buf[16]; + + int n = read(fd, buf, sizeof(buf)); + if (n != 16) { + if (n == -1) { + perror("Error reading from /dev/urandom"); + } + fprintf(stderr, "Could not read enough bytes from /dev/urandom"); + exit(1); + } + + uint32_t x, y, z, c; + memcpy(&x, buf + 0, 4); + memcpy(&y, buf + 4, 4); + memcpy(&z, buf + 8, 4); + memcpy(&c, buf + 12, 4); + + SET_THREAD_LOCAL(rx_, x); + SET_THREAD_LOCAL(ry_, y); + SET_THREAD_LOCAL(rz_, z); + SET_THREAD_LOCAL(rc_, z); +} + +// George Marsaglia's KISS generator +// +// Even though this returns 64 bits, this algorithm was only designed to generate 32 bits. +// The upper 32 bits is going to be highly correlated with the lower 32 bits of the next call. +uint64_t nbd_rand (void) { + LOCALIZE_THREAD_LOCAL(rx_, unsigned); + LOCALIZE_THREAD_LOCAL(ry_, unsigned); + LOCALIZE_THREAD_LOCAL(rz_, unsigned); + LOCALIZE_THREAD_LOCAL(rc_, unsigned); + + uint32_t rx = 69069 * rx_ + 12345; + uint32_t ry = ry_; + ry ^= (ry << 13); + ry ^= (ry >> 17); + ry ^= (ry << 5); + uint64_t t = rz_ * 698769069LL + rc_; + uint64_t r = rx + ry + t; + + SET_THREAD_LOCAL(rx_, rx); + SET_THREAD_LOCAL(ry_, ry); + SET_THREAD_LOCAL(rz_, t); + SET_THREAD_LOCAL(rc_, t >> 32); + + return r; +} diff --git a/runtime/rcu.c b/runtime/rcu.c index 12c37a3..0d65a37 100644 --- a/runtime/rcu.c +++ b/runtime/rcu.c @@ -39,36 +39,35 @@ static fifo_t *fifo_alloc(int scale) { return q; } -void rcu_thread_init (int id) { - assert(id < MAX_NUM_THREADS); - if (pending_[id] == NULL) { - pending_[id] = fifo_alloc(RCU_QUEUE_SCALE); +void rcu_thread_init (void) { + int thread_index = GET_THREAD_INDEX(); + if (pending_[thread_index] == NULL) { + pending_[thread_index] = fifo_alloc(RCU_QUEUE_SCALE); (void)SYNC_ADD(&num_threads_, 1); } } void rcu_update (void) { - LOCALIZE_THREAD_LOCAL(tid_, int); - assert(tid_ < num_threads_); - int next_thread_id = (tid_ + 1) % num_threads_; - TRACE("r1", "rcu_update: updating thread %llu", next_thread_id, 0); + int thread_index = GET_THREAD_INDEX(); + int next_thread_index = (thread_index + 1) % num_threads_; + TRACE("r1", "rcu_update: updating thread %llu", next_thread_index, 0); int i; for (i = 0; i < num_threads_; ++i) { - if (i == tid_) + if (i == thread_index) continue; // No need to post an update if the value hasn't changed - if (rcu_[tid_][i] == rcu_last_posted_[tid_][i]) + if (rcu_[thread_index][i] == rcu_last_posted_[thread_index][i]) continue; - uint64_t x = rcu_[tid_][i]; - rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; + uint64_t x = rcu_[thread_index][i]; + rcu_[next_thread_index][i] = rcu_last_posted_[thread_index][i] = x; TRACE("r2", "rcu_update: posted updated value (%llu) for thread %llu", x, i); } // free - fifo_t *q = pending_[tid_]; - while (q->tail != rcu_[tid_][tid_]) { + fifo_t *q = pending_[thread_index]; + while (q->tail != rcu_[thread_index][thread_index]) { uint32_t i = MOD_SCALE(q->tail, q->scale); TRACE("r0", "rcu_update: freeing %p from queue at position %llu", q->x[i], q->tail); nbd_free(q->x[i]); @@ -78,17 +77,18 @@ void rcu_update (void) { void rcu_defer_free (void *x) { assert(x); - LOCALIZE_THREAD_LOCAL(tid_, int); - fifo_t *q = pending_[tid_]; + int thread_index = GET_THREAD_INDEX(); + fifo_t *q = pending_[thread_index]; assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale)); uint32_t i = MOD_SCALE(q->head, q->scale); q->x[i] = x; TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, q->head); q->head++; - if (pending_[tid_]->head - rcu_last_posted_[tid_][tid_] >= RCU_POST_THRESHOLD) { - TRACE("r0", "rcu_defer_free: posting %llu", pending_[tid_]->head, 0); - int next_thread_id = (tid_ + 1) % num_threads_; - rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head; + if (pending_[thread_index]->head - rcu_last_posted_[thread_index][thread_index] >= RCU_POST_THRESHOLD) { + TRACE("r0", "rcu_defer_free: posting %llu", pending_[thread_index]->head, 0); + int next_thread_index = (thread_index + 1) % num_threads_; + rcu_[next_thread_index][thread_index] = pending_[thread_index]->head; + rcu_last_posted_[thread_index][thread_index] = pending_[thread_index]->head; } } diff --git a/runtime/rlocal.h b/runtime/rlocal.h index bb62c76..fef861f 100644 --- a/runtime/rlocal.h +++ b/runtime/rlocal.h @@ -4,9 +4,15 @@ #include "runtime.h" #include "tls.h" +extern DECLARE_THREAD_LOCAL(ThreadId, int); + +#define GET_THREAD_INDEX() ({ LOCALIZE_THREAD_LOCAL(ThreadId, int); assert(ThreadId != 0); ThreadId - 1; }) + void mem_init (void); +void rnd_init (void); -void rcu_thread_init (int thread_id); -void lwt_thread_init (int thread_id); +void rnd_thread_init (void); +void rcu_thread_init (void); +void lwt_thread_init (void); #endif//RLOCAL_H diff --git a/runtime/runtime.c b/runtime/runtime.c index fe4a1bd..19024e2 100644 --- a/runtime/runtime.c +++ b/runtime/runtime.c @@ -2,7 +2,6 @@ * Written by Josh Dybnis and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ -#define _POSIX_C_SOURCE 1 // for rand_r() #include #include #include "common.h" @@ -11,84 +10,26 @@ #include "mem.h" #include "tls.h" -DECLARE_THREAD_LOCAL(tid_, int); -DECLARE_THREAD_LOCAL(rx_, uint32_t); -DECLARE_THREAD_LOCAL(ry_, uint32_t); -DECLARE_THREAD_LOCAL(rz_, uint32_t); -DECLARE_THREAD_LOCAL(rc_, uint32_t); +DECLARE_THREAD_LOCAL(ThreadId, int); +static int ThreadIndex -typedef struct thread_info { - int thread_id; - void *(*start_routine)(void *); - void *restrict arg; -} thread_info_t; +static int MaxThreadId = 0; __attribute__ ((constructor)) void nbd_init (void) { - INIT_THREAD_LOCAL(r); - INIT_THREAD_LOCAL(tid_); - SET_THREAD_LOCAL(tid_, 0); + rnd_init(); mem_init(); - lwt_thread_init(0); - rcu_thread_init(0); - srand((uint32_t)rdtsc()); } -static void *worker (void *arg) { - thread_info_t *ti = (thread_info_t *)arg; - SET_THREAD_LOCAL(tid_, ti->thread_id); - LOCALIZE_THREAD_LOCAL(tid_, int); +void nbd_thread_init (void) { + LOCALIZE_THREAD_LOCAL(ThreadId, int); - SET_THREAD_LOCAL(rx_, rand()); - SET_THREAD_LOCAL(ry_, rand()); - SET_THREAD_LOCAL(rz_, rand()); - SET_THREAD_LOCAL(rc_, rand()); + if (ThreadId == 0) { + ++MaxThreadId; // TODO: reuse thread id's of threads that have been destroyed + ASSERT(MaxThreadId <= MAX_NUM_THREADS); + SET_THREAD_LOCAL(ThreadId, MaxThreadId); + rnd_thread_init(); + } - lwt_thread_init(ti->thread_id); - rcu_thread_init(ti->thread_id); - - void *ret = ti->start_routine(ti->arg); - nbd_free(ti); - return ret; -} - -int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg) { - thread_info_t *ti = (thread_info_t *)nbd_malloc(sizeof(thread_info_t)); - ti->thread_id = thread_id; - ti->start_routine = start_routine; - ti->arg = arg; - return pthread_create(thread, NULL, worker, ti); -} - -// George Marsaglia's KISS generator -uint64_t nbd_rand (void) { - LOCALIZE_THREAD_LOCAL(rx_, unsigned); - LOCALIZE_THREAD_LOCAL(ry_, unsigned); - LOCALIZE_THREAD_LOCAL(rz_, unsigned); - LOCALIZE_THREAD_LOCAL(rc_, unsigned); - - uint32_t rx = 69069 * rx_ + 12345; - uint32_t ry = ry_; - uint32_t rz = rz_; - ry ^= (ry << 13); - ry ^= (ry >> 17); - ry ^= (ry << 5); - uint64_t t = rz * 698769069LL + rc_; - uint64_t r = rx + ry + (rz = t); - - SET_THREAD_LOCAL(rx_, rx); - SET_THREAD_LOCAL(ry_, ry); - SET_THREAD_LOCAL(rz_, rz); - SET_THREAD_LOCAL(rc_, t >> 32); - - return r; -} - -// Fairly fast random numbers -uint64_t nbd_rand_seed (int i) { - return rdtsc() + -715159705 + i * 129; -} - -int nbd_next_rand (uint64_t *r) { - *r = (*r * 0x5DEECE66DLL + 0xBLL) & MASK(48); - return (*r >> 17) & 0x7FFFFFFF; + lwt_thread_init(); + rcu_thread_init(); } diff --git a/test/map_test1.c b/test/map_test1.c index f05fa3d..11fa5d4 100644 --- a/test/map_test1.c +++ b/test/map_test1.c @@ -21,6 +21,7 @@ static long num_threads_; static map_t *map_; void *worker (void *arg) { + nbd_thread_init(); // Wait for all the worker threads to be ready. (void)SYNC_ADD(&wait_, -1); @@ -56,6 +57,7 @@ void *worker (void *arg) { } int main (int argc, char **argv) { + nbd_thread_init(); lwt_set_trace_level("r0m3s3"); char* program_name = argv[0]; @@ -99,7 +101,7 @@ int main (int argc, char **argv) { wait_ = num_threads_; for (int i = 0; i < num_threads_; ++i) { - int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i); + int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i); if (rc != 0) { perror("pthread_create"); return rc; } } @@ -109,7 +111,7 @@ int main (int argc, char **argv) { gettimeofday(&tv2, NULL); int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000; - map_print(map_); + map_print(map_, FALSE); printf("Th:%ld Time:%dms\n\n", num_threads_, ms); fflush(stdout); } diff --git a/test/map_test2.c b/test/map_test2.c index 52438ea..f8e0abd 100644 --- a/test/map_test2.c +++ b/test/map_test2.c @@ -132,6 +132,8 @@ void basic_test (CuTest* tc) { } void *add_remove_worker (void *arg) { + nbd_thread_init(); + worker_data_t *wd = (worker_data_t *)arg; map_t *map = wd->map; CuTest* tc = wd->tc; @@ -197,7 +199,7 @@ void concurrent_add_remove_test (CuTest* tc) { wd[i].tc = tc; wd[i].map = map; wd[i].wait = &wait; - int rc = nbd_thread_create(thread + i, i, add_remove_worker, wd + i); + int rc = pthread_create(thread + i, NULL, add_remove_worker, wd + i); if (rc != 0) { perror("nbd_thread_create"); return; } } @@ -207,7 +209,7 @@ void concurrent_add_remove_test (CuTest* tc) { gettimeofday(&tv2, NULL); int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000; - map_print(map); + map_print(map, FALSE); printf("Time:%dms\n", ms); fflush(stdout); @@ -315,6 +317,7 @@ void big_iteration_test (CuTest* tc) { } int main (void) { + nbd_thread_init(); lwt_set_trace_level("r0m3l2t0"); static const map_impl_t *map_types[] = { &MAP_IMPL_LL, &MAP_IMPL_SL, &MAP_IMPL_HT }; diff --git a/test/perf_test.c b/test/perf_test.c index 1ea0bcb..088956e 100644 --- a/test/perf_test.c +++ b/test/perf_test.c @@ -27,7 +27,7 @@ static int duration_; #define OP_SELECT_RANGE (1ULL << 20) void *worker (void *arg) { - volatile uint64_t ops = 0; + nbd_thread_init(); // Wait for all the worker threads to be ready. (void)SYNC_ADD(&load_, -1); @@ -44,6 +44,7 @@ void *worker (void *arg) { (void)SYNC_ADD(&start_, -1); do {} while (start_); + uint64_t ops = 0; while (!stop_) { ++ops; map_key_t key = (nbd_rand() & (num_keys_ - 1)) + 1; @@ -77,7 +78,7 @@ uint64_t run_test (void) { pthread_t thread[MAX_NUM_THREADS]; for (int i = 0; i < num_threads_; ++i) { - int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i); + int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i); if (rc != 0) { perror("pthread_create"); exit(rc); } } @@ -152,6 +153,7 @@ int main (int argc, char **argv) { get_range_ = (int)((double)OP_SELECT_RANGE / 100 * read_ratio); put_range_ = get_range_ + (int)(((double)OP_SELECT_RANGE - get_range_) / 100 * put_ratio); + nbd_thread_init(); static const map_impl_t *map_types[] = { &MAP_IMPL_HT }; for (int i = 0; i < sizeof(map_types)/sizeof(*map_types); ++i) { #ifdef TEST_STRING_KEYS diff --git a/test/rcu_test.c b/test/rcu_test.c index b2dedaa..b225855 100644 --- a/test/rcu_test.c +++ b/test/rcu_test.c @@ -1,7 +1,7 @@ -#define _POSIX_C_SOURCE 1 // for rand_r #include #include #include +#include #include #include "common.h" #include "runtime.h" @@ -53,8 +53,7 @@ node_t *node_alloc (void) { } void *worker (void *arg) { - int id = (int)(size_t)arg; - unsigned int rand_seed = (unsigned int)id + 1; + nbd_thread_init(); // Wait for all the worker threads to be ready. (void)__sync_fetch_and_add(&wait_, -1); @@ -62,7 +61,7 @@ void *worker (void *arg) { int i; for (i = 0; i < NUM_ITERATIONS; ++ i) { - int n = rand_r(&rand_seed); + int n = nbd_rand(); if (n & 0x1) { lifo_aba_push(stk_, node_alloc()); } else { @@ -78,9 +77,10 @@ void *worker (void *arg) { } int main (int argc, char **argv) { + nbd_thread_init(); lwt_set_trace_level("m3r3"); - int num_threads = MAX_NUM_THREADS; + int num_threads = sysconf(_SC_NPROCESSORS_CONF); if (argc == 2) { errno = 0; @@ -104,7 +104,7 @@ int main (int argc, char **argv) { pthread_t thread[num_threads]; for (int i = 0; i < num_threads; ++i) { - int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i); + int rc = pthread_create(thread + i, NULL, worker, (void *)(size_t)i); if (rc != 0) { perror("pthread_create"); return rc; } } for (int i = 0; i < num_threads; ++i) { diff --git a/test/txn_test.c b/test/txn_test.c index 0a5045d..4728ef9 100644 --- a/test/txn_test.c +++ b/test/txn_test.c @@ -25,7 +25,7 @@ void test1 (CuTest* tc) { } int main (void) { - + nbd_thread_init(); lwt_set_trace_level("x3h3"); CuString *output = CuStringNew();