#include <sys/types.h>
#define malloc "DON'T USE MALLOC" // use nbd_malloc() instead
+#define calloc "DON'T USE CALLOC" // use nbd_malloc() instead
#define free "DON'T USE FREE" // use nbd_free() instead
#define MAX_NUM_THREADS 4 // make this whatever you want, but make it a power of 2
#define ERROR_INVALID_OPTION (-1)
#define ERROR_INVALID_ARGUMENT (-2)
#define ERROR_UNSUPPORTED_FEATURE (-3)
+#define ERROR_TXN_NOT_RUNNING (-4)
typedef unsigned long long uint64_t;
typedef unsigned int uint32_t;
#include "map.h"
typedef struct ht hashtable_t;
+typedef struct ht_iter ht_iter_t;
hashtable_t *ht_alloc (const datatype_t *key_type);
uint64_t ht_cas (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t val);
void ht_print (hashtable_t *ht);
void ht_free (hashtable_t *ht);
+ht_iter_t *ht_iter_start (hashtable_t *ht, void *key);
+ht_iter_t *ht_iter_next (ht_iter_t *iter);
+uint64_t ht_iter_val (ht_iter_t *iter);
+uint64_t ht_iter_key (ht_iter_t *iter);
+void ht_iter_free (ht_iter_t *iter);
+
static const map_impl_t ht_map_impl = {
(map_alloc_t)ht_alloc, (map_cas_t)ht_cas, (map_get_t)ht_get, (map_remove_t)ht_remove,
(map_count_t)ht_count, (map_print_t)ht_print, (map_free_t)ht_free
#include "map.h"
typedef struct ll list_t;
+typedef struct ll_iter ll_iter_t;
-list_t * ll_alloc (const datatype_t *key_type);
-uint64_t ll_cas (list_t *ll, void *key, uint64_t expected_val, uint64_t new_val);
-uint64_t ll_lookup (list_t *ll, void *key);
-uint64_t ll_remove (list_t *ll, void *key);
-uint64_t ll_count (list_t *ll);
-void ll_print (list_t *ll);
-void ll_free (list_t *ll);
+list_t * ll_alloc (const datatype_t *key_type);
+uint64_t ll_cas (list_t *ll, void *key, uint64_t expected_val, uint64_t new_val);
+uint64_t ll_lookup (list_t *ll, void *key);
+uint64_t ll_remove (list_t *ll, void *key);
+uint64_t ll_count (list_t *ll);
+void ll_print (list_t *ll);
+void ll_free (list_t *ll);
+void * ll_min_key (list_t *sl);
+
+ll_iter_t *ll_iter_start (list_t *ll, void *key);
+ll_iter_t *ll_iter_next (ll_iter_t *iter);
+uint64_t ll_iter_val (ll_iter_t *iter);
+void * ll_iter_key (ll_iter_t *iter);
static const map_impl_t ll_map_impl = {
(map_alloc_t)ll_alloc, (map_cas_t)ll_cas, (map_get_t)ll_lookup, (map_remove_t)ll_remove,
#include "map.h"
typedef struct sl skiplist_t;
+typedef struct sl_iter sl_iter_t;
-skiplist_t * sl_alloc (const datatype_t *key_type);
-uint64_t sl_cas (skiplist_t *sl, void *key, uint64_t expected_val, uint64_t new_val);
-uint64_t sl_lookup (skiplist_t *sl, void *key);
-uint64_t sl_remove (skiplist_t *sl, void *key);
-uint64_t sl_count (skiplist_t *sl);
-void sl_print (skiplist_t *sl);
-void sl_free (skiplist_t *sl);
-void * sl_min_key(skiplist_t *sl);
+skiplist_t *sl_alloc (const datatype_t *key_type);
+uint64_t sl_cas (skiplist_t *sl, void *key, uint64_t expected_val, uint64_t new_val);
+uint64_t sl_lookup (skiplist_t *sl, void *key);
+uint64_t sl_remove (skiplist_t *sl, void *key);
+uint64_t sl_count (skiplist_t *sl);
+void sl_print (skiplist_t *sl);
+void sl_free (skiplist_t *sl);
+void * sl_min_key (skiplist_t *sl);
+
+sl_iter_t *sl_iter_start (skiplist_t *sl, void *key);
+sl_iter_t *sl_iter_next (sl_iter_t *iter);
+uint64_t sl_iter_val (sl_iter_t *iter);
+void * sl_iter_key (sl_iter_t *iter);
static const map_impl_t sl_map_impl = {
(map_alloc_t)sl_alloc, (map_cas_t)sl_cas, (map_get_t)sl_lookup, (map_remove_t)sl_remove,
#define GET_PTR(x) ((void *)((x) & MASK(48))) // low-order 48 bits is a pointer to a nstring_t
-typedef struct ht_entry {
+typedef struct entry {
uint64_t key;
uint64_t val;
} entry_t;
volatile entry_t *table;
hashtable_t *ht; // parent ht;
struct hti *next;
- unsigned int scale;
+ unsigned scale;
int max_probe;
+ int references;
int count; // TODO: make these counters distributed
int num_entries_copied;
- int scan;
+ int copy_scan;
} hti_t;
+struct ht_iter {
+ hti_t * hti;
+ int64_t idx;
+ uint64_t key;
+ uint64_t val;
+};
+
struct ht {
hti_t *hti;
const datatype_t *key_type;
// Allocate and initialize a hti_t with 2^<scale> entries.
static hti_t *hti_alloc (hashtable_t *parent, int scale) {
- // Include enough slop to align the actual table on a cache line boundry
- size_t n = sizeof(hti_t)
- + sizeof(entry_t) * (1 << scale)
- + (CACHE_LINE_SIZE - 1);
- hti_t *hti = (hti_t *)calloc(n, 1);
+ hti_t *hti = (hti_t *)nbd_malloc(sizeof(hti_t));
+ memset(hti, 0, sizeof(hti_t));
- // Align the table of hash entries on a cache line boundry.
- hti->table = (entry_t *)(((uint64_t)hti + sizeof(hti_t) + (CACHE_LINE_SIZE-1))
- & ~(CACHE_LINE_SIZE-1));
+ size_t sz = sizeof(entry_t) * (1 << scale);
+ entry_t *table = nbd_malloc(sz);
+ memset(table, 0, sz);
+ hti->table = table;
hti->scale = scale;
return hti_get(ht->hti, key, hash);
}
-//
-uint64_t ht_cas (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t new_val) {
+// returns TRUE if copy is done
+int hti_help_copy (hti_t *hti) {
+ volatile entry_t *ent;
+ uint64_t limit;
+ uint64_t total_copied = hti->num_entries_copied;
+ int num_copied = 0;
+ int x = hti->copy_scan;
- TRACE("h2", "ht_cas: key %p ht %p", key, ht);
- TRACE("h2", "ht_cas: expected val %p new val %p", expected_val, new_val);
- assert(key != DOES_NOT_EXIST);
- assert(!IS_TAGGED(new_val, TAG1) && new_val != DOES_NOT_EXIST && new_val != TOMBSTONE);
-
- hti_t *hti = ht->hti;
-
- // Help with an ongoing copy.
- if (EXPECT_FALSE(hti->next != NULL)) {
- volatile entry_t *ent;
- uint64_t limit;
- int num_copied = 0;
- int x = hti->scan;
-
- TRACE("h1", "ht_cas: help copy. scan is %llu, size is %llu", x, 1<<hti->scale);
+ TRACE("h1", "ht_cas: help copy. scan is %llu, size is %llu", x, 1<<hti->scale);
+ if (total_copied == (1 << hti->scale)) {
// Panic if we've been around the array twice and still haven't finished the copy.
int panic = (x >= (1 << (hti->scale + 1)));
if (!panic) {
// Reserve some entries for this thread to copy. There is a race condition here because the
// fetch and add isn't atomic, but that is ok.
- hti->scan = x + ENTRIES_PER_COPY_CHUNK;
+ hti->copy_scan = x + ENTRIES_PER_COPY_CHUNK;
- // <hti->scan> might be larger than the size of the table, if some thread stalls while
+ // <copy_scan> might be larger than the size of the table, if some thread stalls while
// copying. In that case we just wrap around to the begining and make another pass through
// the table.
ent = hti->table + (x & MASK(hti->scale));
assert(ent <= hti->table + (1 << hti->scale));
}
if (num_copied != 0) {
- SYNC_ADD(&hti->num_entries_copied, num_copied);
+ total_copied = SYNC_ADD(&hti->num_entries_copied, num_copied);
}
+ }
+
+ return (total_copied == (1 << hti->scale));
+}
+
+//
+uint64_t ht_cas (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t new_val) {
+
+ TRACE("h2", "ht_cas: key %p ht %p", key, ht);
+ TRACE("h2", "ht_cas: expected val %p new val %p", expected_val, new_val);
+ assert(key != DOES_NOT_EXIST);
+ assert(!IS_TAGGED(new_val, TAG1) && new_val != DOES_NOT_EXIST && new_val != TOMBSTONE);
+
+ hti_t *hti = ht->hti;
+
+ // Help with an ongoing copy.
+ if (EXPECT_FALSE(hti->next != NULL)) {
+ int done = hti_help_copy(hti);
// Dispose of fully copied tables.
- if (hti->num_entries_copied == (1 << hti->scale) || panic) {
- assert(hti->next);
- if (SYNC_CAS(&ht->hti, hti, hti->next) == hti) {
- nbd_defer_free(hti);
+ if (done && hti->references == 0) {
+
+ int r = SYNC_CAS(&hti->references, 0, -1);
+ if (r == 0) {
+ assert(hti->next);
+ if (SYNC_CAS(&ht->hti, hti, hti->next) == hti) {
+ nbd_defer_free((void *)hti->table);
+ nbd_defer_free(hti);
+ }
}
}
}
}
}
hti_t *next = hti->next;
+ nbd_free((void *)hti->table);
nbd_free(hti);
hti = next;
} while (hti);
hti = hti->next;
}
}
+
+ht_iter_t *ht_iter_start (hashtable_t *ht, void *key) {
+ hti_t *hti = ht->hti;
+ int rcount;
+ do {
+ while (((volatile hti_t *)hti)->next != NULL) {
+ do { } while (hti_help_copy(hti) != TRUE);
+ hti = hti->next;
+ }
+
+ int old = hti->references;
+ do {
+ rcount = old;
+ if (rcount != -1) {
+ old = SYNC_CAS(&hti->references, rcount, rcount + 1);
+ }
+ } while (rcount != old);
+ } while (rcount == -1);
+
+ ht_iter_t *iter = nbd_malloc(sizeof(ht_iter_t));
+ iter->hti = hti;
+ iter->idx = -1;
+
+ return iter;
+}
+
+ht_iter_t *ht_iter_next (ht_iter_t *iter) {
+ volatile entry_t *ent;
+ uint64_t key;
+ uint64_t val;
+ uint64_t table_size = (1 << iter->hti->scale);
+ do {
+ if (++iter->idx == table_size) {
+ ht_iter_free(iter);
+ return NULL;
+ }
+ ent = &iter->hti->table[++iter->idx];
+ key = ent->key;
+ val = ent->val;
+
+ } while (key == DOES_NOT_EXIST || val == DOES_NOT_EXIST || val == TOMBSTONE);
+
+ iter->key = key;
+ if (val == COPIED_VALUE) {
+ uint32_t hash = (iter->hti->ht->key_type == NULL)
+ ? murmur32_8b(key)
+ : iter->hti->ht->key_type->hash((void *)key);
+ iter->val = hti_get(iter->hti->next, (void *)ent->key, hash);
+ } else {
+ iter->val = val;
+ }
+
+ return iter;
+}
+
+uint64_t ht_iter_val (ht_iter_t *iter) {
+ return iter->val;
+}
+
+uint64_t ht_iter_key (ht_iter_t *iter) {
+ return iter->key;
+}
+
+void ht_iter_free (ht_iter_t *iter) {
+ SYNC_ADD(&iter->hti->references, -1);
+}
+
#include "list.h"
#include "mem.h"
-typedef struct node {
+typedef struct ll_iter node_t;
+
+struct ll_iter {
void *key;
uint64_t val;
- struct node *next;
-} node_t;
+ node_t *next;
+};
struct ll {
node_t *head;
}
printf("\n");
}
+
+ll_iter_t *ll_iter_start (list_t *ll, void *key) {
+ node_t *item;
+ find_pred(NULL, &item, ll, key, FALSE);
+ return item;
+}
+
+ll_iter_t *ll_iter_next (ll_iter_t *iter) {
+ assert(iter);
+ if (EXPECT_FALSE(!iter))
+ return NULL;
+
+ node_t *next = iter->next;
+ while (next != NULL && IS_TAGGED(next->next, TAG1)) {
+ next = (node_t *)STRIP_TAG(next->next, TAG1);
+ }
+
+ return next;
+}
+
+uint64_t ll_iter_val (ll_iter_t *iter) {
+ return iter->val;
+}
+
+void *ll_iter_key (ll_iter_t *iter) {
+ return iter->key;
+}
// Setting MAX_LEVEL to 0 essentially makes this data structure the Harris-Michael lock-free list (in list.c).
#define MAX_LEVEL 31
-typedef struct node {
+typedef struct sl_iter node_t;
+
+struct sl_iter {
void *key;
uint64_t val;
int top_level;
- struct node *next[];
-} node_t;
+ node_t *next[];
+};
struct sl {
node_t *head;
}
}
}
+
+sl_iter_t *sl_iter_start (skiplist_t *sl, void *key) {
+ node_t *item;
+ find_preds(NULL, &item, 0, sl, key, FALSE);
+ return item;
+}
+
+sl_iter_t *sl_iter_next (sl_iter_t *iter) {
+ assert(iter);
+ if (EXPECT_FALSE(!iter))
+ return NULL;
+
+ node_t *next = iter->next[0];
+ while (next != NULL && IS_TAGGED(next->next[0], TAG1)) {
+ next = (node_t *)STRIP_TAG(next->next[0], TAG1);
+ }
+
+ return next;
+}
+
+uint64_t sl_iter_val (sl_iter_t *iter) {
+ return iter->val;
+}
+
+void *sl_iter_key (sl_iter_t *iter) {
+ return iter->key;
+}
ASSERT_EQUAL( 3, tm_get(t1, k1) );
ASSERT_EQUAL( 4, tm_get(t2, k1) );
ASSERT_EQUAL( TXN_VALIDATED, txn_commit(t2));
- ASSERT_EQUAL( TXN_ABORTED, txn_commit(t1));
+ ASSERT_EQUAL( TXN_ABORTED, txn_commit(t1));
}
int main (void) {
// complete validating. It must be finished before we can decide to rollback or commit.
//
static txn_state_e tm_validate_key (txn_t *txn, void *key) {
+ assert(txn->state != TXN_RUNNING);
update_t *update = (update_t *) map_get(txn->map, key);
for (; update != NULL; update = update->next) {
}
static txn_state_e txn_validate (txn_t *txn) {
+ assert(txn->state != TXN_RUNNING);
int i;
switch (txn->state) {
txn->state = TXN_ABORTED;
break;
}
+ assert(s == TXN_VALIDATED);
}
if (txn->state == TXN_VALIDATING) {
txn->state = TXN_VALIDATED;
}
void txn_abort (txn_t *txn) {
+ if (txn->state != TXN_RUNNING)
+ return; // TODO: return some sort of error code
int i;
for (i = 0; i < txn->writes_count; ++i) {
}
txn_state_e txn_commit (txn_t *txn) {
+ if (txn->state != TXN_RUNNING)
+ return txn->state; // TODO: return some sort of error code
assert(txn->state == TXN_RUNNING);
txn->state = TXN_VALIDATING;
// Get most recent committed version prior to our read version.
uint64_t tm_get (txn_t *txn, void *key) {
+ if (txn->state != TXN_RUNNING)
+ return ERROR_TXN_NOT_RUNNING;
update_t *newest_update = (update_t *) map_get(txn->map, key);
if (!IS_TAGGED(newest_update, TAG2))
}
void tm_set (txn_t *txn, void *key, uint64_t value) {
+ if (txn->state != TXN_RUNNING)
+ return; // TODO: return some sort of error code
// create a new update record
update_t *update = alloc_update_rec();