// the dump. It is only included when its specified category is enabled at a trace level greater than or equal to
// the one in <flag>. Categories are case sensitive.
static inline void lwt_trace (const char *flag, const char *format, size_t value1, size_t value2) {
- extern char flag_state_[256];
- if (EXPECT_FALSE(flag_state_[(unsigned)flag[0]] >= flag[1])) {
+ extern char TraceLevel[256];
+ if (EXPECT_FALSE(TraceLevel[(unsigned)flag[0]] >= flag[1])) {
// embed <flags> in <format> so we don't have to make the lwt_record_t any bigger than it already is
uint64_t f = ((uint64_t)(size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48));
extern void lwt_trace_i (uint64_t format, size_t value1, size_t value2);
#include <pthread.h>
#include "tls.h"
-extern DECLARE_THREAD_LOCAL(tid_, int);
-
-int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg);
+void nbd_thread_init (void);
uint64_t nbd_rand (void);
-uint64_t nbd_rand_seed (int i);
-int nbd_next_rand (uint64_t *r);
#endif//RUNTIME_H
#define INIT_THREAD_LOCAL(name) \
do { \
- if (pthread_key_create(&name##_KEY, NULL) != 0) { assert(FALSE); } \
+ if (pthread_key_create(&name##_KEY, NULL) != 0) { \
+ assert("error initializing thread local variable " #name, FALSE); \
+ } \
} while (0)
-#define SET_THREAD_LOCAL(name, value) pthread_setspecific(name##_KEY, (void *)(size_t)value);
+#define SET_THREAD_LOCAL(name, value) \
+ do { \
+ name = value; \
+ pthread_setspecific(name##_KEY, (void *)(size_t)value); \
+ } while (0);
#define LOCALIZE_THREAD_LOCAL(name, type) type name = (type)(size_t)pthread_getspecific(name##_KEY)
# Makefile for building programs with whole-program interfile optimization
###################################################################################################
CFLAGS0 := -Wall -Werror -std=gnu99 -lpthread #-m32 -DNBD32
-CFLAGS1 := $(CFLAGS0) -g -O3 #-DNDEBUG #-fwhole-program -combine
+CFLAGS1 := $(CFLAGS0) -g #-DNDEBUG #-fwhole-program -combine
CFLAGS2 := $(CFLAGS1) #-DENABLE_TRACE
CFLAGS3 := $(CFLAGS2) #-DLIST_USE_HAZARD_POINTER
CFLAGS := $(CFLAGS3) #-DNBD_SINGLE_THREADED #-DUSE_SYSTEM_MALLOC #-DTEST_STRING_KEYS
INCS := $(addprefix -I, include)
-TESTS := output/perf_test #output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
+TESTS := output/perf_test output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
OBJS := $(TESTS)
-RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c #runtime/hazard.c
+RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c runtime/random.c \
+ datatype/nstring.c #runtime/hazard.c
MAP_SRCS := map/map.c map/list.c map/skiplist.c map/hashtable.c
haz_test_SRCS := $(RUNTIME_SRCS) test/haz_test.c
static void resize_pending (void) {
TRACE("H2", "haz_resize_pending", 0, 0);
- LOCALIZE_THREAD_LOCAL(tid_, int);
- haz_local_t *l = haz_local_ + tid_;
+ LOCALIZE_THREAD_LOCAL(ThreadId, int);
+ haz_local_t *l = haz_local_ + ThreadId;
pending_t *p = nbd_malloc(sizeof(pending_t) * l->pending_size * 2);
memcpy(p, l->pending, l->pending_size);
nbd_free(l->pending);
TRACE("H1", "haz_defer_free: %p (%p)", d, f);
assert(d);
assert(f);
- LOCALIZE_THREAD_LOCAL(tid_, int);
- haz_local_t *l = haz_local_ + tid_;
+ LOCALIZE_THREAD_LOCAL(ThreadId, int);
+ haz_local_t *l = haz_local_ + ThreadId;
while (l->pending_count == l->pending_size) {
if (l->pending_size == 0) {
TRACE("H1", "haz_get_static: %p", i, 0);
if (i >= STATIC_HAZ_PER_THREAD)
return NULL;
- LOCALIZE_THREAD_LOCAL(tid_, int);
+ LOCALIZE_THREAD_LOCAL(ThreadId, int);
assert(i < STATIC_HAZ_PER_THREAD);
- haz_t *ret = &haz_local_[tid_].static_haz[i];
+ haz_t *ret = &haz_local_[ThreadId].static_haz[i];
TRACE("H1", "haz_get_static: returning %p", ret, 0);
return ret;
}
void haz_register_dynamic (haz_t *haz) {
TRACE("H1", "haz_register_dynamic: %p", haz, 0);
- LOCALIZE_THREAD_LOCAL(tid_, int);
- haz_local_t *l = haz_local_ + tid_;
+ LOCALIZE_THREAD_LOCAL(ThreadId, int);
+ haz_local_t *l = haz_local_ + ThreadId;
if (l->dynamic_size == 0) {
int n = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD;
// assumes <haz> was registered in the same thread
void haz_unregister_dynamic (void **haz) {
TRACE("H1", "haz_unregister_dynamic: %p", haz, 0);
- LOCALIZE_THREAD_LOCAL(tid_, int);
- haz_local_t *l = haz_local_ + tid_;
+ LOCALIZE_THREAD_LOCAL(ThreadId, int);
+ haz_local_t *l = haz_local_ + ThreadId;
for (int i = 0; i < l->dynamic_count; ++i) {
if (l->dynamic[i] == haz) {
lwt_record_t x[0];
} lwt_buffer_t;
-lwt_buffer_t *lwt_buf_[MAX_NUM_THREADS] = {};
-char flag_state_[256] = {};
-static const char *flags_ = "";
-
-void lwt_thread_init (int thread_id)
-{
- assert(thread_id < MAX_NUM_THREADS);
- if (lwt_buf_[thread_id] == NULL) {
- lwt_buf_[thread_id] = (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
- memset(lwt_buf_[thread_id], 0, sizeof(lwt_buffer_t));
+lwt_buffer_t *TraceBuffer[MAX_NUM_THREADS] = {};
+char TraceLevel[256] = {};
+static const char *TraceSpec = "";
+
+void lwt_thread_init (void) {
+ int thread_index = GET_THREAD_INDEX();
+
+ if (TraceBuffer[thread_index] == NULL) {
+ TraceBuffer[thread_index] =
+ (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
+ memset(TraceBuffer[thread_index], 0, sizeof(lwt_buffer_t));
}
}
-void lwt_set_trace_level (const char *flags)
-{
+void lwt_set_trace_level (const char *flags) {
assert(strlen(flags) % 2 == 0); // a well formed <flags> should be an even number of characters long
- flags_ = flags;
- memset(flag_state_, 0, sizeof(flag_state_));
+ TraceSpec = flags;
+ memset(TraceLevel, 0, sizeof(TraceLevel));
for (int i = 0; flags[i]; i+=2) {
- flag_state_[(unsigned)flags[i]] = flags[i+1];
+ TraceLevel[(unsigned)flags[i]] = flags[i+1];
}
}
-static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset)
-{
+static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) {
// print the record if its trace category is enabled at a high enough level
int flag = r->format >> 56;
int level = (r->format >> 48) & 0xFF;
- if (flag_state_[(unsigned)flag] >= level) {
+ if (TraceLevel[(unsigned)flag] >= level) {
char s[3] = {flag, level, '\0'};
fprintf(file, "%09llu %d %s ", ((uint64_t)r->timestamp - offset) >> 5, thread_id, s);
const char *format = (const char *)(size_t)(r->format & MASK(48)); // strip out the embedded flags
}
}
-static void dump_buffer (FILE *file, int thread_id, uint64_t offset)
-{
- lwt_buffer_t *tb = lwt_buf_[thread_id];
+static void dump_buffer (FILE *file, int thread_index, uint64_t offset) {
+ lwt_buffer_t *tb = TraceBuffer[thread_index];
assert(tb);
if (tb->head > LWT_BUFFER_SIZE) {
for (int i = tb->head & LWT_BUFFER_MASK; i < LWT_BUFFER_SIZE; ++i) {
- dump_record(file, thread_id, tb->x + i, offset);
+ dump_record(file, thread_index + 1, tb->x + i, offset);
}
}
for (int i = 0; i < (tb->head & LWT_BUFFER_MASK); ++i) {
- dump_record(file, thread_id, tb->x + i, offset);
+ dump_record(file, thread_index + 1, tb->x + i, offset);
}
}
halt_ = 1;
}
-void lwt_dump (const char *file_name)
-{
+void lwt_dump (const char *file_name) {
halt_ = 1;
uint64_t offset = (uint64_t)-1;
for (int i = 0; i < MAX_NUM_THREADS; ++i) {
- if (lwt_buf_[i] != NULL && lwt_buf_[i]->head != 0) {
- uint64_t x = lwt_buf_[i]->x[0].timestamp;
+ if (TraceBuffer[i] != NULL && TraceBuffer[i]->head != 0) {
+ uint64_t x = TraceBuffer[i]->x[0].timestamp;
if (x < offset) {
offset = x;
}
- if (lwt_buf_[i]->head > LWT_BUFFER_SIZE)
+ if (TraceBuffer[i]->head > LWT_BUFFER_SIZE)
{
- x = lwt_buf_[i]->x[lwt_buf_[i]->head & LWT_BUFFER_MASK].timestamp;
+ x = TraceBuffer[i]->x[TraceBuffer[i]->head & LWT_BUFFER_MASK].timestamp;
if (x < offset) {
offset = x;
}
FILE *file = fopen(file_name, "w");
assert(file);
for (int i = 0; i < MAX_NUM_THREADS; ++i) {
- if (lwt_buf_[i] != NULL) {
+ if (TraceBuffer[i] != NULL) {
dump_buffer(file, i, offset);
}
}
void lwt_trace_i (uint64_t format, size_t value1, size_t value2) {
while (halt_) {}
- LOCALIZE_THREAD_LOCAL(tid_, int);
- lwt_buffer_t *tb = lwt_buf_[tid_];
+ lwt_buffer_t *tb = TraceBuffer[GET_THREAD_INDEX()];
if (tb != NULL) {
unsigned int u, l;
__asm__ __volatile__("rdtsc" : "=a" (l), "=d" (u));
}
static void *get_new_region (int block_scale) {
- LOCALIZE_THREAD_LOCAL(tid_, int);
+ int thread_index = GET_THREAD_INDEX();
#ifdef RECYCLE_PAGES
- tl_t *tl = &tl_[tid_]; // thread-local data
+ tl_t *tl = &tl_[thread_index]; // thread-local data
if (block_scale <= PAGE_SCALE && tl->free_pages != NULL) {
void *region = tl->free_pages;
tl->free_pages = tl->free_pages->next;
TRACE("m1", "get_new_region: header %p (%p)", h, h - headers_);
assert(h->scale == 0);
h->scale = block_scale;
- h->owner = tid_;
+ h->owner = thread_index;
return region;
}
void nbd_free (void *x) {
TRACE("m1", "nbd_free: block %p page %p", x, (size_t)x & ~MASK(PAGE_SCALE));
ASSERT(x);
- LOCALIZE_THREAD_LOCAL(tid_, int);
block_t *b = (block_t *)x;
header_t *h = get_header(x);
int b_scale = h->scale;
#ifndef NDEBUG
memset(b, 0xcd, (1ULL << b_scale)); // bear trap
#endif
- tl_t *tl = &tl_[tid_]; // thread-local data
- if (h->owner == tid_) {
+ int thread_index = GET_THREAD_INDEX();
+ tl_t *tl = &tl_[thread_index]; // thread-local data
+ if (h->owner == thread_index) {
TRACE("m1", "nbd_free: private block, old free list head %p", tl->free_list[b_scale], 0);
#ifndef RECYCLE_PAGES
if (EXPECT_FALSE(b_scale < MIN_SCALE)) { b_scale = MIN_SCALE; }
if (EXPECT_FALSE(b_scale > MAX_SCALE)) { return NULL; }
- LOCALIZE_THREAD_LOCAL(tid_, int);
- tl_t *tl = &tl_[tid_]; // thread-local data
+ tl_t *tl = &tl_[GET_THREAD_INDEX()]; // thread-local data
block_t *b = pop_free_list(tl, b_scale);
if (b != NULL) {
--- /dev/null
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#include "common.h"
+#include "runtime.h"
+
+DECLARE_THREAD_LOCAL(rx_, uint32_t);
+DECLARE_THREAD_LOCAL(ry_, uint32_t);
+DECLARE_THREAD_LOCAL(rz_, uint32_t);
+DECLARE_THREAD_LOCAL(rc_, uint32_t);
+
+void rnd_init (void) {
+ INIT_THREAD_LOCAL(rx_);
+ INIT_THREAD_LOCAL(ry_);
+ INIT_THREAD_LOCAL(rz_);
+ INIT_THREAD_LOCAL(rc_);
+}
+
+// TODO: put a lock around this so that multiple threads being initialize concurrently don't read
+// the same values from /dev/urandom
+void rnd_thread_init (void) {
+ int fd = open("/dev/urandom", O_RDONLY);
+ if (fd == -1) {
+ perror("Error opening /dev/urandom");
+ exit(1);
+ }
+
+ char buf[16];
+
+ int n = read(fd, buf, sizeof(buf));
+ if (n != 16) {
+ if (n == -1) {
+ perror("Error reading from /dev/urandom");
+ }
+ fprintf(stderr, "Could not read enough bytes from /dev/urandom");
+ exit(1);
+ }
+
+ uint32_t x, y, z, c;
+ memcpy(&x, buf + 0, 4);
+ memcpy(&y, buf + 4, 4);
+ memcpy(&z, buf + 8, 4);
+ memcpy(&c, buf + 12, 4);
+
+ SET_THREAD_LOCAL(rx_, x);
+ SET_THREAD_LOCAL(ry_, y);
+ SET_THREAD_LOCAL(rz_, z);
+ SET_THREAD_LOCAL(rc_, z);
+}
+
+// George Marsaglia's KISS generator
+//
+// Even though this returns 64 bits, this algorithm was only designed to generate 32 bits.
+// The upper 32 bits is going to be highly correlated with the lower 32 bits of the next call.
+uint64_t nbd_rand (void) {
+ LOCALIZE_THREAD_LOCAL(rx_, unsigned);
+ LOCALIZE_THREAD_LOCAL(ry_, unsigned);
+ LOCALIZE_THREAD_LOCAL(rz_, unsigned);
+ LOCALIZE_THREAD_LOCAL(rc_, unsigned);
+
+ uint32_t rx = 69069 * rx_ + 12345;
+ uint32_t ry = ry_;
+ ry ^= (ry << 13);
+ ry ^= (ry >> 17);
+ ry ^= (ry << 5);
+ uint64_t t = rz_ * 698769069LL + rc_;
+ uint64_t r = rx + ry + t;
+
+ SET_THREAD_LOCAL(rx_, rx);
+ SET_THREAD_LOCAL(ry_, ry);
+ SET_THREAD_LOCAL(rz_, t);
+ SET_THREAD_LOCAL(rc_, t >> 32);
+
+ return r;
+}
return q;
}
-void rcu_thread_init (int id) {
- assert(id < MAX_NUM_THREADS);
- if (pending_[id] == NULL) {
- pending_[id] = fifo_alloc(RCU_QUEUE_SCALE);
+void rcu_thread_init (void) {
+ int thread_index = GET_THREAD_INDEX();
+ if (pending_[thread_index] == NULL) {
+ pending_[thread_index] = fifo_alloc(RCU_QUEUE_SCALE);
(void)SYNC_ADD(&num_threads_, 1);
}
}
void rcu_update (void) {
- LOCALIZE_THREAD_LOCAL(tid_, int);
- assert(tid_ < num_threads_);
- int next_thread_id = (tid_ + 1) % num_threads_;
- TRACE("r1", "rcu_update: updating thread %llu", next_thread_id, 0);
+ int thread_index = GET_THREAD_INDEX();
+ int next_thread_index = (thread_index + 1) % num_threads_;
+ TRACE("r1", "rcu_update: updating thread %llu", next_thread_index, 0);
int i;
for (i = 0; i < num_threads_; ++i) {
- if (i == tid_)
+ if (i == thread_index)
continue;
// No need to post an update if the value hasn't changed
- if (rcu_[tid_][i] == rcu_last_posted_[tid_][i])
+ if (rcu_[thread_index][i] == rcu_last_posted_[thread_index][i])
continue;
- uint64_t x = rcu_[tid_][i];
- rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x;
+ uint64_t x = rcu_[thread_index][i];
+ rcu_[next_thread_index][i] = rcu_last_posted_[thread_index][i] = x;
TRACE("r2", "rcu_update: posted updated value (%llu) for thread %llu", x, i);
}
// free
- fifo_t *q = pending_[tid_];
- while (q->tail != rcu_[tid_][tid_]) {
+ fifo_t *q = pending_[thread_index];
+ while (q->tail != rcu_[thread_index][thread_index]) {
uint32_t i = MOD_SCALE(q->tail, q->scale);
TRACE("r0", "rcu_update: freeing %p from queue at position %llu", q->x[i], q->tail);
nbd_free(q->x[i]);
void rcu_defer_free (void *x) {
assert(x);
- LOCALIZE_THREAD_LOCAL(tid_, int);
- fifo_t *q = pending_[tid_];
+ int thread_index = GET_THREAD_INDEX();
+ fifo_t *q = pending_[thread_index];
assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale));
uint32_t i = MOD_SCALE(q->head, q->scale);
q->x[i] = x;
TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, q->head);
q->head++;
- if (pending_[tid_]->head - rcu_last_posted_[tid_][tid_] >= RCU_POST_THRESHOLD) {
- TRACE("r0", "rcu_defer_free: posting %llu", pending_[tid_]->head, 0);
- int next_thread_id = (tid_ + 1) % num_threads_;
- rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
+ if (pending_[thread_index]->head - rcu_last_posted_[thread_index][thread_index] >= RCU_POST_THRESHOLD) {
+ TRACE("r0", "rcu_defer_free: posting %llu", pending_[thread_index]->head, 0);
+ int next_thread_index = (thread_index + 1) % num_threads_;
+ rcu_[next_thread_index][thread_index] = pending_[thread_index]->head;
+ rcu_last_posted_[thread_index][thread_index] = pending_[thread_index]->head;
}
}
#include "runtime.h"
#include "tls.h"
+extern DECLARE_THREAD_LOCAL(ThreadId, int);
+
+#define GET_THREAD_INDEX() ({ LOCALIZE_THREAD_LOCAL(ThreadId, int); assert(ThreadId != 0); ThreadId - 1; })
+
void mem_init (void);
+void rnd_init (void);
-void rcu_thread_init (int thread_id);
-void lwt_thread_init (int thread_id);
+void rnd_thread_init (void);
+void rcu_thread_init (void);
+void lwt_thread_init (void);
#endif//RLOCAL_H
* Written by Josh Dybnis and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
-#define _POSIX_C_SOURCE 1 // for rand_r()
#include <stdlib.h>
#include <pthread.h>
#include "common.h"
#include "mem.h"
#include "tls.h"
-DECLARE_THREAD_LOCAL(tid_, int);
-DECLARE_THREAD_LOCAL(rx_, uint32_t);
-DECLARE_THREAD_LOCAL(ry_, uint32_t);
-DECLARE_THREAD_LOCAL(rz_, uint32_t);
-DECLARE_THREAD_LOCAL(rc_, uint32_t);
+DECLARE_THREAD_LOCAL(ThreadId, int);
+static int ThreadIndex
-typedef struct thread_info {
- int thread_id;
- void *(*start_routine)(void *);
- void *restrict arg;
-} thread_info_t;
+static int MaxThreadId = 0;
__attribute__ ((constructor)) void nbd_init (void) {
- INIT_THREAD_LOCAL(r);
- INIT_THREAD_LOCAL(tid_);
- SET_THREAD_LOCAL(tid_, 0);
+ rnd_init();
mem_init();
- lwt_thread_init(0);
- rcu_thread_init(0);
- srand((uint32_t)rdtsc());
}
-static void *worker (void *arg) {
- thread_info_t *ti = (thread_info_t *)arg;
- SET_THREAD_LOCAL(tid_, ti->thread_id);
- LOCALIZE_THREAD_LOCAL(tid_, int);
+void nbd_thread_init (void) {
+ LOCALIZE_THREAD_LOCAL(ThreadId, int);
- SET_THREAD_LOCAL(rx_, rand());
- SET_THREAD_LOCAL(ry_, rand());
- SET_THREAD_LOCAL(rz_, rand());
- SET_THREAD_LOCAL(rc_, rand());
+ if (ThreadId == 0) {
+ ++MaxThreadId; // TODO: reuse thread id's of threads that have been destroyed
+ ASSERT(MaxThreadId <= MAX_NUM_THREADS);
+ SET_THREAD_LOCAL(ThreadId, MaxThreadId);
+ rnd_thread_init();
+ }
- lwt_thread_init(ti->thread_id);
- rcu_thread_init(ti->thread_id);
-
- void *ret = ti->start_routine(ti->arg);
- nbd_free(ti);
- return ret;
-}
-
-int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg) {
- thread_info_t *ti = (thread_info_t *)nbd_malloc(sizeof(thread_info_t));
- ti->thread_id = thread_id;
- ti->start_routine = start_routine;
- ti->arg = arg;
- return pthread_create(thread, NULL, worker, ti);
-}
-
-// George Marsaglia's KISS generator
-uint64_t nbd_rand (void) {
- LOCALIZE_THREAD_LOCAL(rx_, unsigned);
- LOCALIZE_THREAD_LOCAL(ry_, unsigned);
- LOCALIZE_THREAD_LOCAL(rz_, unsigned);
- LOCALIZE_THREAD_LOCAL(rc_, unsigned);
-
- uint32_t rx = 69069 * rx_ + 12345;
- uint32_t ry = ry_;
- uint32_t rz = rz_;
- ry ^= (ry << 13);
- ry ^= (ry >> 17);
- ry ^= (ry << 5);
- uint64_t t = rz * 698769069LL + rc_;
- uint64_t r = rx + ry + (rz = t);
-
- SET_THREAD_LOCAL(rx_, rx);
- SET_THREAD_LOCAL(ry_, ry);
- SET_THREAD_LOCAL(rz_, rz);
- SET_THREAD_LOCAL(rc_, t >> 32);
-
- return r;
-}
-
-// Fairly fast random numbers
-uint64_t nbd_rand_seed (int i) {
- return rdtsc() + -715159705 + i * 129;
-}
-
-int nbd_next_rand (uint64_t *r) {
- *r = (*r * 0x5DEECE66DLL + 0xBLL) & MASK(48);
- return (*r >> 17) & 0x7FFFFFFF;
+ lwt_thread_init();
+ rcu_thread_init();
}
static map_t *map_;
void *worker (void *arg) {
+ nbd_thread_init();
// Wait for all the worker threads to be ready.
(void)SYNC_ADD(&wait_, -1);
}
int main (int argc, char **argv) {
+ nbd_thread_init();
lwt_set_trace_level("r0m3s3");
char* program_name = argv[0];
wait_ = num_threads_;
for (int i = 0; i < num_threads_; ++i) {
- int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
+ int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
if (rc != 0) { perror("pthread_create"); return rc; }
}
gettimeofday(&tv2, NULL);
int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
- map_print(map_);
+ map_print(map_, FALSE);
printf("Th:%ld Time:%dms\n\n", num_threads_, ms);
fflush(stdout);
}
}
void *add_remove_worker (void *arg) {
+ nbd_thread_init();
+
worker_data_t *wd = (worker_data_t *)arg;
map_t *map = wd->map;
CuTest* tc = wd->tc;
wd[i].tc = tc;
wd[i].map = map;
wd[i].wait = &wait;
- int rc = nbd_thread_create(thread + i, i, add_remove_worker, wd + i);
+ int rc = pthread_create(thread + i, NULL, add_remove_worker, wd + i);
if (rc != 0) { perror("nbd_thread_create"); return; }
}
gettimeofday(&tv2, NULL);
int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
- map_print(map);
+ map_print(map, FALSE);
printf("Time:%dms\n", ms);
fflush(stdout);
}
int main (void) {
+ nbd_thread_init();
lwt_set_trace_level("r0m3l2t0");
static const map_impl_t *map_types[] = { &MAP_IMPL_LL, &MAP_IMPL_SL, &MAP_IMPL_HT };
#define OP_SELECT_RANGE (1ULL << 20)
void *worker (void *arg) {
- volatile uint64_t ops = 0;
+ nbd_thread_init();
// Wait for all the worker threads to be ready.
(void)SYNC_ADD(&load_, -1);
(void)SYNC_ADD(&start_, -1);
do {} while (start_);
+ uint64_t ops = 0;
while (!stop_) {
++ops;
map_key_t key = (nbd_rand() & (num_keys_ - 1)) + 1;
pthread_t thread[MAX_NUM_THREADS];
for (int i = 0; i < num_threads_; ++i) {
- int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
+ int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
if (rc != 0) { perror("pthread_create"); exit(rc); }
}
get_range_ = (int)((double)OP_SELECT_RANGE / 100 * read_ratio);
put_range_ = get_range_ + (int)(((double)OP_SELECT_RANGE - get_range_) / 100 * put_ratio);
+ nbd_thread_init();
static const map_impl_t *map_types[] = { &MAP_IMPL_HT };
for (int i = 0; i < sizeof(map_types)/sizeof(*map_types); ++i) {
#ifdef TEST_STRING_KEYS
-#define _POSIX_C_SOURCE 1 // for rand_r
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
+#include <unistd.h>
#include <sys/time.h>
#include "common.h"
#include "runtime.h"
}
void *worker (void *arg) {
- int id = (int)(size_t)arg;
- unsigned int rand_seed = (unsigned int)id + 1;
+ nbd_thread_init();
// Wait for all the worker threads to be ready.
(void)__sync_fetch_and_add(&wait_, -1);
int i;
for (i = 0; i < NUM_ITERATIONS; ++ i) {
- int n = rand_r(&rand_seed);
+ int n = nbd_rand();
if (n & 0x1) {
lifo_aba_push(stk_, node_alloc());
} else {
}
int main (int argc, char **argv) {
+ nbd_thread_init();
lwt_set_trace_level("m3r3");
- int num_threads = MAX_NUM_THREADS;
+ int num_threads = sysconf(_SC_NPROCESSORS_CONF);
if (argc == 2)
{
errno = 0;
pthread_t thread[num_threads];
for (int i = 0; i < num_threads; ++i) {
- int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i);
+ int rc = pthread_create(thread + i, NULL, worker, (void *)(size_t)i);
if (rc != 0) { perror("pthread_create"); return rc; }
}
for (int i = 0; i < num_threads; ++i) {
}
int main (void) {
-
+ nbd_thread_init();
lwt_set_trace_level("x3h3");
CuString *output = CuStringNew();