// the dump. It is only included when its specified category is enabled at a trace level greater than or equal to
// the one in <flag>. Categories are case sensitive.
static inline void lwt_trace (const char *flag, const char *format, size_t value1, size_t value2) {
- extern char TraceLevel[256];
- if (EXPECT_FALSE(TraceLevel[(unsigned)flag[0]] >= flag[1])) {
+ extern char flag_state_[256];
+ if (EXPECT_FALSE(flag_state_[(unsigned)flag[0]] >= flag[1])) {
// embed <flags> in <format> so we don't have to make the lwt_record_t any bigger than it already is
uint64_t f = ((uint64_t)(size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48));
extern void lwt_trace_i (uint64_t format, size_t value1, size_t value2);
*/
#ifndef MEM_H
#define MEM_H
-void *nbd_malloc (size_t n) __attribute__((malloc, alloc_size(1)));
+void *nbd_malloc (size_t n) __attribute__((malloc));
void nbd_free (void *x) __attribute__((nonnull));
#endif//MEM_H
#include <pthread.h>
#include "tls.h"
-void nbd_thread_init (void);
+extern DECLARE_THREAD_LOCAL(tid_, int);
+
+int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg);
uint64_t nbd_rand (void);
+uint64_t nbd_rand_seed (int i);
+int nbd_next_rand (uint64_t *r);
#endif//RUNTIME_H
#define INIT_THREAD_LOCAL(name) \
do { \
- if (pthread_key_create(&name##_KEY, NULL) != 0) { \
- assert("error initializing thread local variable " #name, FALSE); \
- } \
+ if (pthread_key_create(&name##_KEY, NULL) != 0) { assert(FALSE); } \
} while (0)
-#define SET_THREAD_LOCAL(name, value) \
- do { \
- name = value; \
- pthread_setspecific(name##_KEY, (void *)(size_t)value); \
- } while (0);
+#define SET_THREAD_LOCAL(name, value) pthread_setspecific(name##_KEY, (void *)(size_t)value);
#define LOCALIZE_THREAD_LOCAL(name, type) type name = (type)(size_t)pthread_getspecific(name##_KEY)
# Makefile for building programs with whole-program interfile optimization
###################################################################################################
CFLAGS0 := -Wall -Werror -std=gnu99 -lpthread #-m32 -DNBD32
-CFLAGS1 := $(CFLAGS0) -g #-DNDEBUG #-fwhole-program -combine
+CFLAGS1 := $(CFLAGS0) -g -O3 #-DNDEBUG #-fwhole-program -combine
CFLAGS2 := $(CFLAGS1) #-DENABLE_TRACE
CFLAGS3 := $(CFLAGS2) #-DLIST_USE_HAZARD_POINTER
CFLAGS := $(CFLAGS3) #-DNBD_SINGLE_THREADED #-DUSE_SYSTEM_MALLOC #-DTEST_STRING_KEYS
INCS := $(addprefix -I, include)
-TESTS := output/perf_test output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
+TESTS := output/perf_test #output/map_test1 output/map_test2 output/rcu_test output/txn_test #output/haz_test
OBJS := $(TESTS)
-RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c runtime/random.c \
- datatype/nstring.c #runtime/hazard.c
+RUNTIME_SRCS := runtime/runtime.c runtime/rcu.c runtime/lwt.c runtime/mem.c datatype/nstring.c #runtime/hazard.c
MAP_SRCS := map/map.c map/list.c map/skiplist.c map/hashtable.c
haz_test_SRCS := $(RUNTIME_SRCS) test/haz_test.c
static void resize_pending (void) {
TRACE("H2", "haz_resize_pending", 0, 0);
- LOCALIZE_THREAD_LOCAL(ThreadId, int);
- haz_local_t *l = haz_local_ + ThreadId;
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ haz_local_t *l = haz_local_ + tid_;
pending_t *p = nbd_malloc(sizeof(pending_t) * l->pending_size * 2);
memcpy(p, l->pending, l->pending_size);
nbd_free(l->pending);
TRACE("H1", "haz_defer_free: %p (%p)", d, f);
assert(d);
assert(f);
- LOCALIZE_THREAD_LOCAL(ThreadId, int);
- haz_local_t *l = haz_local_ + ThreadId;
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ haz_local_t *l = haz_local_ + tid_;
while (l->pending_count == l->pending_size) {
if (l->pending_size == 0) {
TRACE("H1", "haz_get_static: %p", i, 0);
if (i >= STATIC_HAZ_PER_THREAD)
return NULL;
- LOCALIZE_THREAD_LOCAL(ThreadId, int);
+ LOCALIZE_THREAD_LOCAL(tid_, int);
assert(i < STATIC_HAZ_PER_THREAD);
- haz_t *ret = &haz_local_[ThreadId].static_haz[i];
+ haz_t *ret = &haz_local_[tid_].static_haz[i];
TRACE("H1", "haz_get_static: returning %p", ret, 0);
return ret;
}
void haz_register_dynamic (haz_t *haz) {
TRACE("H1", "haz_register_dynamic: %p", haz, 0);
- LOCALIZE_THREAD_LOCAL(ThreadId, int);
- haz_local_t *l = haz_local_ + ThreadId;
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ haz_local_t *l = haz_local_ + tid_;
if (l->dynamic_size == 0) {
int n = MAX_NUM_THREADS * STATIC_HAZ_PER_THREAD;
// assumes <haz> was registered in the same thread
void haz_unregister_dynamic (void **haz) {
TRACE("H1", "haz_unregister_dynamic: %p", haz, 0);
- LOCALIZE_THREAD_LOCAL(ThreadId, int);
- haz_local_t *l = haz_local_ + ThreadId;
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ haz_local_t *l = haz_local_ + tid_;
for (int i = 0; i < l->dynamic_count; ++i) {
if (l->dynamic[i] == haz) {
lwt_record_t x[0];
} lwt_buffer_t;
-lwt_buffer_t *TraceBuffer[MAX_NUM_THREADS] = {};
-char TraceLevel[256] = {};
-static const char *TraceSpec = "";
-
-void lwt_thread_init (void) {
- int thread_index = GET_THREAD_INDEX();
-
- if (TraceBuffer[thread_index] == NULL) {
- TraceBuffer[thread_index] =
- (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
- memset(TraceBuffer[thread_index], 0, sizeof(lwt_buffer_t));
+lwt_buffer_t *lwt_buf_[MAX_NUM_THREADS] = {};
+char flag_state_[256] = {};
+static const char *flags_ = "";
+
+void lwt_thread_init (int thread_id)
+{
+ assert(thread_id < MAX_NUM_THREADS);
+ if (lwt_buf_[thread_id] == NULL) {
+ lwt_buf_[thread_id] = (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE);
+ memset(lwt_buf_[thread_id], 0, sizeof(lwt_buffer_t));
}
}
-void lwt_set_trace_level (const char *flags) {
+void lwt_set_trace_level (const char *flags)
+{
assert(strlen(flags) % 2 == 0); // a well formed <flags> should be an even number of characters long
- TraceSpec = flags;
- memset(TraceLevel, 0, sizeof(TraceLevel));
+ flags_ = flags;
+ memset(flag_state_, 0, sizeof(flag_state_));
for (int i = 0; flags[i]; i+=2) {
- TraceLevel[(unsigned)flags[i]] = flags[i+1];
+ flag_state_[(unsigned)flags[i]] = flags[i+1];
}
}
-static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) {
+static void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset)
+{
// print the record if its trace category is enabled at a high enough level
int flag = r->format >> 56;
int level = (r->format >> 48) & 0xFF;
- if (TraceLevel[(unsigned)flag] >= level) {
+ if (flag_state_[(unsigned)flag] >= level) {
char s[3] = {flag, level, '\0'};
fprintf(file, "%09llu %d %s ", ((uint64_t)r->timestamp - offset) >> 5, thread_id, s);
const char *format = (const char *)(size_t)(r->format & MASK(48)); // strip out the embedded flags
}
}
-static void dump_buffer (FILE *file, int thread_index, uint64_t offset) {
- lwt_buffer_t *tb = TraceBuffer[thread_index];
+static void dump_buffer (FILE *file, int thread_id, uint64_t offset)
+{
+ lwt_buffer_t *tb = lwt_buf_[thread_id];
assert(tb);
if (tb->head > LWT_BUFFER_SIZE) {
for (int i = tb->head & LWT_BUFFER_MASK; i < LWT_BUFFER_SIZE; ++i) {
- dump_record(file, thread_index + 1, tb->x + i, offset);
+ dump_record(file, thread_id, tb->x + i, offset);
}
}
for (int i = 0; i < (tb->head & LWT_BUFFER_MASK); ++i) {
- dump_record(file, thread_index + 1, tb->x + i, offset);
+ dump_record(file, thread_id, tb->x + i, offset);
}
}
halt_ = 1;
}
-void lwt_dump (const char *file_name) {
+void lwt_dump (const char *file_name)
+{
halt_ = 1;
uint64_t offset = (uint64_t)-1;
for (int i = 0; i < MAX_NUM_THREADS; ++i) {
- if (TraceBuffer[i] != NULL && TraceBuffer[i]->head != 0) {
- uint64_t x = TraceBuffer[i]->x[0].timestamp;
+ if (lwt_buf_[i] != NULL && lwt_buf_[i]->head != 0) {
+ uint64_t x = lwt_buf_[i]->x[0].timestamp;
if (x < offset) {
offset = x;
}
- if (TraceBuffer[i]->head > LWT_BUFFER_SIZE)
+ if (lwt_buf_[i]->head > LWT_BUFFER_SIZE)
{
- x = TraceBuffer[i]->x[TraceBuffer[i]->head & LWT_BUFFER_MASK].timestamp;
+ x = lwt_buf_[i]->x[lwt_buf_[i]->head & LWT_BUFFER_MASK].timestamp;
if (x < offset) {
offset = x;
}
FILE *file = fopen(file_name, "w");
assert(file);
for (int i = 0; i < MAX_NUM_THREADS; ++i) {
- if (TraceBuffer[i] != NULL) {
+ if (lwt_buf_[i] != NULL) {
dump_buffer(file, i, offset);
}
}
void lwt_trace_i (uint64_t format, size_t value1, size_t value2) {
while (halt_) {}
- lwt_buffer_t *tb = TraceBuffer[GET_THREAD_INDEX()];
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ lwt_buffer_t *tb = lwt_buf_[tid_];
if (tb != NULL) {
unsigned int u, l;
__asm__ __volatile__("rdtsc" : "=a" (l), "=d" (u));
}
static void *get_new_region (int block_scale) {
- int thread_index = GET_THREAD_INDEX();
+ LOCALIZE_THREAD_LOCAL(tid_, int);
#ifdef RECYCLE_PAGES
- tl_t *tl = &tl_[thread_index]; // thread-local data
+ tl_t *tl = &tl_[tid_]; // thread-local data
if (block_scale <= PAGE_SCALE && tl->free_pages != NULL) {
void *region = tl->free_pages;
tl->free_pages = tl->free_pages->next;
TRACE("m1", "get_new_region: header %p (%p)", h, h - headers_);
assert(h->scale == 0);
h->scale = block_scale;
- h->owner = thread_index;
+ h->owner = tid_;
return region;
}
void nbd_free (void *x) {
TRACE("m1", "nbd_free: block %p page %p", x, (size_t)x & ~MASK(PAGE_SCALE));
ASSERT(x);
+ LOCALIZE_THREAD_LOCAL(tid_, int);
block_t *b = (block_t *)x;
header_t *h = get_header(x);
int b_scale = h->scale;
#ifndef NDEBUG
memset(b, 0xcd, (1ULL << b_scale)); // bear trap
#endif
- int thread_index = GET_THREAD_INDEX();
- tl_t *tl = &tl_[thread_index]; // thread-local data
- if (h->owner == thread_index) {
+ tl_t *tl = &tl_[tid_]; // thread-local data
+ if (h->owner == tid_) {
TRACE("m1", "nbd_free: private block, old free list head %p", tl->free_list[b_scale], 0);
#ifndef RECYCLE_PAGES
if (EXPECT_FALSE(b_scale < MIN_SCALE)) { b_scale = MIN_SCALE; }
if (EXPECT_FALSE(b_scale > MAX_SCALE)) { return NULL; }
- tl_t *tl = &tl_[GET_THREAD_INDEX()]; // thread-local data
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ tl_t *tl = &tl_[tid_]; // thread-local data
block_t *b = pop_free_list(tl, b_scale);
if (b != NULL) {
+++ /dev/null
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <pthread.h>
-#include <unistd.h>
-#include <fcntl.h>
-
-#include "common.h"
-#include "runtime.h"
-
-DECLARE_THREAD_LOCAL(rx_, uint32_t);
-DECLARE_THREAD_LOCAL(ry_, uint32_t);
-DECLARE_THREAD_LOCAL(rz_, uint32_t);
-DECLARE_THREAD_LOCAL(rc_, uint32_t);
-
-void rnd_init (void) {
- INIT_THREAD_LOCAL(rx_);
- INIT_THREAD_LOCAL(ry_);
- INIT_THREAD_LOCAL(rz_);
- INIT_THREAD_LOCAL(rc_);
-}
-
-// TODO: put a lock around this so that multiple threads being initialize concurrently don't read
-// the same values from /dev/urandom
-void rnd_thread_init (void) {
- int fd = open("/dev/urandom", O_RDONLY);
- if (fd == -1) {
- perror("Error opening /dev/urandom");
- exit(1);
- }
-
- char buf[16];
-
- int n = read(fd, buf, sizeof(buf));
- if (n != 16) {
- if (n == -1) {
- perror("Error reading from /dev/urandom");
- }
- fprintf(stderr, "Could not read enough bytes from /dev/urandom");
- exit(1);
- }
-
- uint32_t x, y, z, c;
- memcpy(&x, buf + 0, 4);
- memcpy(&y, buf + 4, 4);
- memcpy(&z, buf + 8, 4);
- memcpy(&c, buf + 12, 4);
-
- SET_THREAD_LOCAL(rx_, x);
- SET_THREAD_LOCAL(ry_, y);
- SET_THREAD_LOCAL(rz_, z);
- SET_THREAD_LOCAL(rc_, z);
-}
-
-// George Marsaglia's KISS generator
-//
-// Even though this returns 64 bits, this algorithm was only designed to generate 32 bits.
-// The upper 32 bits is going to be highly correlated with the lower 32 bits of the next call.
-uint64_t nbd_rand (void) {
- LOCALIZE_THREAD_LOCAL(rx_, unsigned);
- LOCALIZE_THREAD_LOCAL(ry_, unsigned);
- LOCALIZE_THREAD_LOCAL(rz_, unsigned);
- LOCALIZE_THREAD_LOCAL(rc_, unsigned);
-
- uint32_t rx = 69069 * rx_ + 12345;
- uint32_t ry = ry_;
- ry ^= (ry << 13);
- ry ^= (ry >> 17);
- ry ^= (ry << 5);
- uint64_t t = rz_ * 698769069LL + rc_;
- uint64_t r = rx + ry + t;
-
- SET_THREAD_LOCAL(rx_, rx);
- SET_THREAD_LOCAL(ry_, ry);
- SET_THREAD_LOCAL(rz_, t);
- SET_THREAD_LOCAL(rc_, t >> 32);
-
- return r;
-}
return q;
}
-void rcu_thread_init (void) {
- int thread_index = GET_THREAD_INDEX();
- if (pending_[thread_index] == NULL) {
- pending_[thread_index] = fifo_alloc(RCU_QUEUE_SCALE);
+void rcu_thread_init (int id) {
+ assert(id < MAX_NUM_THREADS);
+ if (pending_[id] == NULL) {
+ pending_[id] = fifo_alloc(RCU_QUEUE_SCALE);
(void)SYNC_ADD(&num_threads_, 1);
}
}
void rcu_update (void) {
- int thread_index = GET_THREAD_INDEX();
- int next_thread_index = (thread_index + 1) % num_threads_;
- TRACE("r1", "rcu_update: updating thread %llu", next_thread_index, 0);
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ assert(tid_ < num_threads_);
+ int next_thread_id = (tid_ + 1) % num_threads_;
+ TRACE("r1", "rcu_update: updating thread %llu", next_thread_id, 0);
int i;
for (i = 0; i < num_threads_; ++i) {
- if (i == thread_index)
+ if (i == tid_)
continue;
// No need to post an update if the value hasn't changed
- if (rcu_[thread_index][i] == rcu_last_posted_[thread_index][i])
+ if (rcu_[tid_][i] == rcu_last_posted_[tid_][i])
continue;
- uint64_t x = rcu_[thread_index][i];
- rcu_[next_thread_index][i] = rcu_last_posted_[thread_index][i] = x;
+ uint64_t x = rcu_[tid_][i];
+ rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x;
TRACE("r2", "rcu_update: posted updated value (%llu) for thread %llu", x, i);
}
// free
- fifo_t *q = pending_[thread_index];
- while (q->tail != rcu_[thread_index][thread_index]) {
+ fifo_t *q = pending_[tid_];
+ while (q->tail != rcu_[tid_][tid_]) {
uint32_t i = MOD_SCALE(q->tail, q->scale);
TRACE("r0", "rcu_update: freeing %p from queue at position %llu", q->x[i], q->tail);
nbd_free(q->x[i]);
void rcu_defer_free (void *x) {
assert(x);
- int thread_index = GET_THREAD_INDEX();
- fifo_t *q = pending_[thread_index];
+ LOCALIZE_THREAD_LOCAL(tid_, int);
+ fifo_t *q = pending_[tid_];
assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale));
uint32_t i = MOD_SCALE(q->head, q->scale);
q->x[i] = x;
TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, q->head);
q->head++;
- if (pending_[thread_index]->head - rcu_last_posted_[thread_index][thread_index] >= RCU_POST_THRESHOLD) {
- TRACE("r0", "rcu_defer_free: posting %llu", pending_[thread_index]->head, 0);
- int next_thread_index = (thread_index + 1) % num_threads_;
- rcu_[next_thread_index][thread_index] = pending_[thread_index]->head;
- rcu_last_posted_[thread_index][thread_index] = pending_[thread_index]->head;
+ if (pending_[tid_]->head - rcu_last_posted_[tid_][tid_] >= RCU_POST_THRESHOLD) {
+ TRACE("r0", "rcu_defer_free: posting %llu", pending_[tid_]->head, 0);
+ int next_thread_id = (tid_ + 1) % num_threads_;
+ rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
}
}
#include "runtime.h"
#include "tls.h"
-extern DECLARE_THREAD_LOCAL(ThreadId, int);
-
-#define GET_THREAD_INDEX() ({ LOCALIZE_THREAD_LOCAL(ThreadId, int); assert(ThreadId != 0); ThreadId - 1; })
-
void mem_init (void);
-void rnd_init (void);
-void rnd_thread_init (void);
-void rcu_thread_init (void);
-void lwt_thread_init (void);
+void rcu_thread_init (int thread_id);
+void lwt_thread_init (int thread_id);
#endif//RLOCAL_H
* Written by Josh Dybnis and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
+#define _POSIX_C_SOURCE 1 // for rand_r()
#include <stdlib.h>
#include <pthread.h>
#include "common.h"
#include "mem.h"
#include "tls.h"
-DECLARE_THREAD_LOCAL(ThreadId, int);
-static int ThreadIndex
+DECLARE_THREAD_LOCAL(tid_, int);
+DECLARE_THREAD_LOCAL(rx_, uint32_t);
+DECLARE_THREAD_LOCAL(ry_, uint32_t);
+DECLARE_THREAD_LOCAL(rz_, uint32_t);
+DECLARE_THREAD_LOCAL(rc_, uint32_t);
-static int MaxThreadId = 0;
+typedef struct thread_info {
+ int thread_id;
+ void *(*start_routine)(void *);
+ void *restrict arg;
+} thread_info_t;
__attribute__ ((constructor)) void nbd_init (void) {
- rnd_init();
+ // INIT_THREAD_LOCAL(r);
+ INIT_THREAD_LOCAL(tid_);
+ SET_THREAD_LOCAL(tid_, 0);
mem_init();
+ lwt_thread_init(0);
+ rcu_thread_init(0);
+ srand((uint32_t)rdtsc());
}
-void nbd_thread_init (void) {
- LOCALIZE_THREAD_LOCAL(ThreadId, int);
+static void *worker (void *arg) {
+ thread_info_t *ti = (thread_info_t *)arg;
+ SET_THREAD_LOCAL(tid_, ti->thread_id);
+ //LOCALIZE_THREAD_LOCAL(tid_, int);
- if (ThreadId == 0) {
- ++MaxThreadId; // TODO: reuse thread id's of threads that have been destroyed
- ASSERT(MaxThreadId <= MAX_NUM_THREADS);
- SET_THREAD_LOCAL(ThreadId, MaxThreadId);
- rnd_thread_init();
- }
+ SET_THREAD_LOCAL(rx_, rand());
+ SET_THREAD_LOCAL(ry_, rand());
+ SET_THREAD_LOCAL(rz_, rand());
+ SET_THREAD_LOCAL(rc_, rand());
- lwt_thread_init();
- rcu_thread_init();
+ lwt_thread_init(ti->thread_id);
+ rcu_thread_init(ti->thread_id);
+
+ void *ret = ti->start_routine(ti->arg);
+ nbd_free(ti);
+ return ret;
+}
+
+int nbd_thread_create (pthread_t *restrict thread, int thread_id, void *(*start_routine)(void *), void *restrict arg) {
+ thread_info_t *ti = (thread_info_t *)nbd_malloc(sizeof(thread_info_t));
+ ti->thread_id = thread_id;
+ ti->start_routine = start_routine;
+ ti->arg = arg;
+ return pthread_create(thread, NULL, worker, ti);
+}
+
+// George Marsaglia's KISS generator
+uint64_t nbd_rand (void) {
+ LOCALIZE_THREAD_LOCAL(rx_, unsigned);
+ LOCALIZE_THREAD_LOCAL(ry_, unsigned);
+ LOCALIZE_THREAD_LOCAL(rz_, unsigned);
+ LOCALIZE_THREAD_LOCAL(rc_, unsigned);
+
+ uint32_t rx = 69069 * rx_ + 12345;
+ uint32_t ry = ry_;
+ uint32_t rz = rz_;
+ ry ^= (ry << 13);
+ ry ^= (ry >> 17);
+ ry ^= (ry << 5);
+ uint64_t t = rz * 698769069LL + rc_;
+ uint64_t r = rx + ry + (rz = t);
+
+ SET_THREAD_LOCAL(rx_, rx);
+ SET_THREAD_LOCAL(ry_, ry);
+ SET_THREAD_LOCAL(rz_, rz);
+ SET_THREAD_LOCAL(rc_, (unsigned)(t >> 32));
+
+ return r;
+}
+
+// Fairly fast random numbers
+uint64_t nbd_rand_seed (int i) {
+ return rdtsc() + -715159705 + i * 129;
+}
+
+int nbd_next_rand (uint64_t *r) {
+ *r = (*r * 0x5DEECE66DLL + 0xBLL) & MASK(48);
+ return (*r >> 17) & 0x7FFFFFFF;
}
static map_t *map_;
void *worker (void *arg) {
- nbd_thread_init();
// Wait for all the worker threads to be ready.
(void)SYNC_ADD(&wait_, -1);
}
int main (int argc, char **argv) {
- nbd_thread_init();
lwt_set_trace_level("r0m3s3");
char* program_name = argv[0];
wait_ = num_threads_;
for (int i = 0; i < num_threads_; ++i) {
- int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
+ int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
if (rc != 0) { perror("pthread_create"); return rc; }
}
gettimeofday(&tv2, NULL);
int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
- map_print(map_, FALSE);
+ map_print(map_);
printf("Th:%ld Time:%dms\n\n", num_threads_, ms);
fflush(stdout);
}
}
void *add_remove_worker (void *arg) {
- nbd_thread_init();
-
worker_data_t *wd = (worker_data_t *)arg;
map_t *map = wd->map;
CuTest* tc = wd->tc;
wd[i].tc = tc;
wd[i].map = map;
wd[i].wait = &wait;
- int rc = pthread_create(thread + i, NULL, add_remove_worker, wd + i);
+ int rc = nbd_thread_create(thread + i, i, add_remove_worker, wd + i);
if (rc != 0) { perror("nbd_thread_create"); return; }
}
gettimeofday(&tv2, NULL);
int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000;
- map_print(map, FALSE);
+ map_print(map);
printf("Time:%dms\n", ms);
fflush(stdout);
}
int main (void) {
- nbd_thread_init();
lwt_set_trace_level("r0m3l2t0");
static const map_impl_t *map_types[] = { &MAP_IMPL_LL, &MAP_IMPL_SL, &MAP_IMPL_HT };
#define OP_SELECT_RANGE (1ULL << 20)
void *worker (void *arg) {
- nbd_thread_init();
+ volatile uint64_t ops = 0;
// Wait for all the worker threads to be ready.
(void)SYNC_ADD(&load_, -1);
(void)SYNC_ADD(&start_, -1);
do {} while (start_);
- uint64_t ops = 0;
while (!stop_) {
++ops;
map_key_t key = (nbd_rand() & (num_keys_ - 1)) + 1;
pthread_t thread[MAX_NUM_THREADS];
for (int i = 0; i < num_threads_; ++i) {
- int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i);
+ int rc = nbd_thread_create(thread + i, i, worker, (void*)(size_t)i);
if (rc != 0) { perror("pthread_create"); exit(rc); }
}
get_range_ = (int)((double)OP_SELECT_RANGE / 100 * read_ratio);
put_range_ = get_range_ + (int)(((double)OP_SELECT_RANGE - get_range_) / 100 * put_ratio);
- nbd_thread_init();
static const map_impl_t *map_types[] = { &MAP_IMPL_HT };
for (int i = 0; i < sizeof(map_types)/sizeof(*map_types); ++i) {
#ifdef TEST_STRING_KEYS
+#define _POSIX_C_SOURCE 1 // for rand_r
#include <stdio.h>
#include <errno.h>
#include <pthread.h>
-#include <unistd.h>
#include <sys/time.h>
#include "common.h"
#include "runtime.h"
}
void *worker (void *arg) {
- nbd_thread_init();
+ int id = (int)(size_t)arg;
+ unsigned int rand_seed = (unsigned int)id + 1;
// Wait for all the worker threads to be ready.
(void)__sync_fetch_and_add(&wait_, -1);
int i;
for (i = 0; i < NUM_ITERATIONS; ++ i) {
- int n = nbd_rand();
+ int n = rand_r(&rand_seed);
if (n & 0x1) {
lifo_aba_push(stk_, node_alloc());
} else {
}
int main (int argc, char **argv) {
- nbd_thread_init();
lwt_set_trace_level("m3r3");
- int num_threads = sysconf(_SC_NPROCESSORS_CONF);
+ int num_threads = MAX_NUM_THREADS;
if (argc == 2)
{
errno = 0;
pthread_t thread[num_threads];
for (int i = 0; i < num_threads; ++i) {
- int rc = pthread_create(thread + i, NULL, worker, (void *)(size_t)i);
+ int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i);
if (rc != 0) { perror("pthread_create"); return rc; }
}
for (int i = 0; i < num_threads; ++i) {
}
int main (void) {
- nbd_thread_init();
+
lwt_set_trace_level("x3h3");
CuString *output = CuStringNew();