]> pd.if.org Git - nbds/blob - runtime/rcu.c
Fix initialization ordering problem from last commit.
[nbds] / runtime / rcu.c
1 /* 
2  * Written by Josh Dybnis and released to the public domain, as explained at
3  * http://creativecommons.org/licenses/publicdomain
4  *
5  * safe memory reclemation using a simple technique from rcu
6  */
7 #include <string.h>
8 #include "common.h"
9 #include "runtime_local.h"
10 #include "lwt.h"
11 #include "mem.h"
12 #include "tls.h"
13
14 #define RCU_POST_THRESHOLD 10
15 #define RCU_QUEUE_SCALE 20
16
17 typedef struct fifo {
18     uint32_t head;
19     uint32_t tail;
20     uint32_t scale;
21     void *x[0];
22 } fifo_t;
23
24 static uint64_t rcu_[MAX_NUM_THREADS][MAX_NUM_THREADS] = {};
25 static uint64_t rcu_last_posted_[MAX_NUM_THREADS][MAX_NUM_THREADS] = {};
26 static fifo_t *pending_[MAX_NUM_THREADS] = {};
27 static int num_threads_ = 0;
28
29 static fifo_t *fifo_alloc(int scale) {
30     fifo_t *q = (fifo_t *)nbd_malloc(sizeof(fifo_t) + (1 << scale) * sizeof(void *)); 
31     memset(q, 0, sizeof(fifo_t));
32     q->scale = scale;
33     q->head = 0;
34     q->tail = 0;
35     return q;
36 }
37
38 static uint32_t fifo_index (fifo_t *q, uint32_t i) {
39     return i & MASK(q->scale);
40 }
41
42 static void fifo_enqueue (fifo_t *q, void *x) {
43     assert(fifo_index(q, q->head + 1) != fifo_index(q, q->tail));
44     uint32_t i = fifo_index(q, q->head++);
45     q->x[i] = x;
46 }
47
48 static void *fifo_dequeue (fifo_t *q) {
49     uint32_t i = fifo_index(q, q->tail++);
50     return q->x[i];
51 }
52
53 void rcu_thread_init (int id) {
54     assert(id < MAX_NUM_THREADS);
55     if (pending_[id] == NULL) {
56         pending_[id] = fifo_alloc(RCU_QUEUE_SCALE);
57         SYNC_ADD(&num_threads_, 1);
58     }
59 }
60
61 static void rcu_post (uint64_t x) {
62     LOCALIZE_THREAD_LOCAL(tid_, int);
63     if (x - rcu_last_posted_[tid_][tid_] < RCU_POST_THRESHOLD)
64         return;
65
66     int next_thread_id = (tid_ + 1) % num_threads_;
67
68     TRACE("r0", "rcu_post: %llu", x, 0);
69     rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = x;
70 }
71
72 void rcu_update (void) {
73     LOCALIZE_THREAD_LOCAL(tid_, int);
74     assert(tid_ < num_threads_);
75     int next_thread_id = (tid_ + 1) % num_threads_;
76     int i;
77     for (i = 0; i < num_threads_; ++i) {
78         if (i == tid_)
79             continue;
80
81         // No need to post an update if the value hasn't changed
82         if (rcu_[tid_][i] == rcu_last_posted_[tid_][i])
83             continue;
84
85         uint64_t x = rcu_[tid_][i];
86         rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x;
87     }
88
89     // free
90     while (pending_[tid_]->tail != rcu_[tid_][tid_]) {
91         nbd_free(fifo_dequeue(pending_[tid_]));
92     }
93 }
94
95 void nbd_defer_free (void *x) {
96     LOCALIZE_THREAD_LOCAL(tid_, int);
97     fifo_enqueue(pending_[tid_], x);
98     TRACE("r0", "nbd_defer_free: put %p on queue at position %llu", x, pending_[tid_]->head);
99     rcu_post(pending_[tid_]->head);
100 }
101
102 #ifdef MAKE_rcu_test
103 #include <errno.h>
104 #include <stdio.h>
105 #include "runtime.h"
106
107 #define NUM_ITERATIONS 10000000
108
109 typedef struct node {
110     struct node *next;
111 } node_t;
112
113 typedef struct lifo {
114     node_t *head;
115 } lifo_t;
116
117 static volatile int wait_;
118 static lifo_t *stk_;
119
120 static lifo_t *lifo_alloc (void) {
121     lifo_t *stk = (lifo_t *)nbd_malloc(sizeof(lifo_t)); 
122     memset(stk, 0, sizeof(lifo_t));
123     return stk;
124 }
125
126 static void lifo_aba_push (lifo_t *stk, node_t *x) {
127     node_t *head;
128     do {
129         head = ((volatile lifo_t *)stk)->head;
130         ((volatile node_t *)x)->next = head;
131     } while (__sync_val_compare_and_swap(&stk->head, head, x) != head);
132 }
133
134 node_t *lifo_aba_pop (lifo_t *stk) {
135     node_t *head;
136     do {
137         head = ((volatile lifo_t *)stk)->head;
138         if (head == NULL)
139             return NULL;
140     } while (__sync_val_compare_and_swap(&stk->head, head, head->next) != head);
141     head->next = NULL;
142     return head;
143 }
144
145 node_t *node_alloc (void) {
146     node_t *node = (node_t *)nbd_malloc(sizeof(node_t));
147     memset(node, 0, sizeof(node_t));
148     return node;
149 }
150
151 void *worker (void *arg) {
152     int id = (int)(size_t)arg;
153     unsigned int rand_seed = (unsigned int)id + 1;
154
155     // Wait for all the worker threads to be ready.
156     __sync_fetch_and_add(&wait_, -1);
157     do {} while (wait_); 
158
159     int i;
160     for (i = 0; i < NUM_ITERATIONS; ++ i) {
161         int n = rand_r(&rand_seed);
162         if (n & 0x1) {
163             lifo_aba_push(stk_, node_alloc());
164         } else {
165             node_t *x = lifo_aba_pop(stk_);
166             if (x) {
167                 nbd_defer_free(x);
168             }
169         }
170         rcu_update();
171     }
172
173     return NULL;
174 }
175
176 int main (int argc, char **argv) {
177     nbd_init();
178     //lwt_set_trace_level("m0r0");
179
180     int num_threads = 2;
181     if (argc == 2)
182     {
183         errno = 0;
184         num_threads = strtol(argv[1], NULL, 10);
185         if (errno) {
186             fprintf(stderr, "%s: Invalid argument for number of threads\n", argv[0]);
187             return -1;
188         }
189         if (num_threads <= 0) {
190             fprintf(stderr, "%s: Number of threads must be at least 1\n", argv[0]);
191             return -1;
192         }
193     }
194
195     stk_ = lifo_alloc();
196     wait_ = num_threads;
197
198     pthread_t thread[num_threads];
199     for (int i = 0; i < num_threads; ++i) {
200         int rc = nbd_thread_create(thread + i, i, worker, (void *)(size_t)i);
201         if (rc != 0) { perror("pthread_create"); return rc; }
202     }
203     for (int i = 0; i < num_threads; ++i) {
204         pthread_join(thread[i], NULL);
205     }
206
207     return 0;
208 }
209 #endif//rcu_test