X-Git-Url: https://pd.if.org/git/?p=nbds;a=blobdiff_plain;f=runtime%2Frcu.c;h=0d65a37f6dda8bca88b9e40e634a663b7637f776;hp=897bdffdedd0e70cbec5351686a5dca210c27ba6;hb=HEAD;hpb=d791fc64b23a9c0a3ed90aa60571344d7735aee9 diff --git a/runtime/rcu.c b/runtime/rcu.c index 897bdff..0d65a37 100644 --- a/runtime/rcu.c +++ b/runtime/rcu.c @@ -1,4 +1,4 @@ -/* +/* * Written by Josh Dybnis and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain * @@ -12,6 +12,7 @@ #include "lwt.h" #include "mem.h" #include "tls.h" +#include "rcu.h" #define RCU_POST_THRESHOLD 10 #define RCU_QUEUE_SCALE 20 @@ -23,13 +24,14 @@ typedef struct fifo { void *x[0]; } fifo_t; +#define MOD_SCALE(x, b) ((x) & MASK(b)) static uint64_t rcu_[MAX_NUM_THREADS][MAX_NUM_THREADS] = {}; static uint64_t rcu_last_posted_[MAX_NUM_THREADS][MAX_NUM_THREADS] = {}; static fifo_t *pending_[MAX_NUM_THREADS] = {}; static int num_threads_ = 0; static fifo_t *fifo_alloc(int scale) { - fifo_t *q = (fifo_t *)nbd_malloc(sizeof(fifo_t) + (1 << scale) * sizeof(void *)); + fifo_t *q = (fifo_t *)nbd_malloc(sizeof(fifo_t) + (1ULL << scale) * sizeof(void *)); memset(q, 0, sizeof(fifo_t)); q->scale = scale; q->head = 0; @@ -37,66 +39,56 @@ static fifo_t *fifo_alloc(int scale) { return q; } -static uint32_t fifo_index (fifo_t *q, uint32_t i) { - return i & MASK(q->scale); -} - -static void fifo_enqueue (fifo_t *q, void *x) { - assert(fifo_index(q, q->head + 1) != fifo_index(q, q->tail)); - uint32_t i = fifo_index(q, q->head++); - q->x[i] = x; -} - -static void *fifo_dequeue (fifo_t *q) { - uint32_t i = fifo_index(q, q->tail++); - return q->x[i]; -} - -void rcu_thread_init (int id) { - assert(id < MAX_NUM_THREADS); - if (pending_[id] == NULL) { - pending_[id] = fifo_alloc(RCU_QUEUE_SCALE); - SYNC_ADD(&num_threads_, 1); +void rcu_thread_init (void) { + int thread_index = GET_THREAD_INDEX(); + if (pending_[thread_index] == NULL) { + pending_[thread_index] = fifo_alloc(RCU_QUEUE_SCALE); + (void)SYNC_ADD(&num_threads_, 1); } } -static void rcu_post (uint64_t x) { - LOCALIZE_THREAD_LOCAL(tid_, int); - if (x - rcu_last_posted_[tid_][tid_] < RCU_POST_THRESHOLD) - return; - - int next_thread_id = (tid_ + 1) % num_threads_; - - TRACE("r0", "rcu_post: %llu", x, 0); - rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = x; -} - void rcu_update (void) { - LOCALIZE_THREAD_LOCAL(tid_, int); - assert(tid_ < num_threads_); - int next_thread_id = (tid_ + 1) % num_threads_; + int thread_index = GET_THREAD_INDEX(); + int next_thread_index = (thread_index + 1) % num_threads_; + TRACE("r1", "rcu_update: updating thread %llu", next_thread_index, 0); int i; for (i = 0; i < num_threads_; ++i) { - if (i == tid_) + if (i == thread_index) continue; // No need to post an update if the value hasn't changed - if (rcu_[tid_][i] == rcu_last_posted_[tid_][i]) + if (rcu_[thread_index][i] == rcu_last_posted_[thread_index][i]) continue; - uint64_t x = rcu_[tid_][i]; - rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; + uint64_t x = rcu_[thread_index][i]; + rcu_[next_thread_index][i] = rcu_last_posted_[thread_index][i] = x; + TRACE("r2", "rcu_update: posted updated value (%llu) for thread %llu", x, i); } // free - while (pending_[tid_]->tail != rcu_[tid_][tid_]) { - nbd_free(fifo_dequeue(pending_[tid_])); + fifo_t *q = pending_[thread_index]; + while (q->tail != rcu_[thread_index][thread_index]) { + uint32_t i = MOD_SCALE(q->tail, q->scale); + TRACE("r0", "rcu_update: freeing %p from queue at position %llu", q->x[i], q->tail); + nbd_free(q->x[i]); + q->tail++; } } -void nbd_defer_free (void *x) { - LOCALIZE_THREAD_LOCAL(tid_, int); - fifo_enqueue(pending_[tid_], x); - TRACE("r0", "nbd_defer_free: put %p on queue at position %llu", x, pending_[tid_]->head); - rcu_post(pending_[tid_]->head); +void rcu_defer_free (void *x) { + assert(x); + int thread_index = GET_THREAD_INDEX(); + fifo_t *q = pending_[thread_index]; + assert(MOD_SCALE(q->head + 1, q->scale) != MOD_SCALE(q->tail, q->scale)); + uint32_t i = MOD_SCALE(q->head, q->scale); + q->x[i] = x; + TRACE("r0", "rcu_defer_free: put %p on queue at position %llu", x, q->head); + q->head++; + + if (pending_[thread_index]->head - rcu_last_posted_[thread_index][thread_index] >= RCU_POST_THRESHOLD) { + TRACE("r0", "rcu_defer_free: posting %llu", pending_[thread_index]->head, 0); + int next_thread_index = (thread_index + 1) % num_threads_; + rcu_[next_thread_index][thread_index] = pending_[thread_index]->head; + rcu_last_posted_[thread_index][thread_index] = pending_[thread_index]->head; + } }