]> pd.if.org Git - nbds/commitdiff
add iterators to hashtable, skiplist, and list
authorjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Tue, 9 Dec 2008 08:20:43 +0000 (08:20 +0000)
committerjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Tue, 9 Dec 2008 08:20:43 +0000 (08:20 +0000)
include/common.h
include/hashtable.h
include/list.h
include/skiplist.h
map/hashtable.c
map/list.c
map/skiplist.c
test/txn_test.c
txn/txn.c

index faebe4c807160d7f093bb34a85acd0e584dcc305..0d493c5ed72882a50822d99ac8c9f8a6047cbed9 100644 (file)
@@ -12,6 +12,7 @@
 #include <sys/types.h>
 
 #define malloc "DON'T USE MALLOC" // use nbd_malloc() instead
+#define calloc "DON'T USE CALLOC" // use nbd_malloc() instead
 #define free   "DON'T USE FREE"   // use nbd_free() instead
 
 #define MAX_NUM_THREADS 4 // make this whatever you want, but make it a power of 2
@@ -41,6 +42,7 @@
 #define ERROR_INVALID_OPTION (-1)
 #define ERROR_INVALID_ARGUMENT (-2)
 #define ERROR_UNSUPPORTED_FEATURE (-3)
+#define ERROR_TXN_NOT_RUNNING (-4)
 
 typedef unsigned long long uint64_t;
 typedef unsigned int       uint32_t;
index 9a32abc94746fc2bb966013572f7b209c688af4f..5ae84c11db2d670df7c44523cbf02bf2b391ee97 100644 (file)
@@ -4,6 +4,7 @@
 #include "map.h"
 
 typedef struct ht hashtable_t;
+typedef struct ht_iter ht_iter_t;
 
 hashtable_t *ht_alloc (const datatype_t *key_type);
 uint64_t ht_cas    (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t val);
@@ -13,6 +14,12 @@ uint64_t ht_count  (hashtable_t *ht);
 void     ht_print  (hashtable_t *ht);
 void     ht_free   (hashtable_t *ht);
 
+ht_iter_t *ht_iter_start (hashtable_t *ht, void *key);
+ht_iter_t *ht_iter_next  (ht_iter_t *iter);
+uint64_t   ht_iter_val   (ht_iter_t *iter);
+uint64_t   ht_iter_key   (ht_iter_t *iter);
+void       ht_iter_free  (ht_iter_t *iter);
+
 static const map_impl_t ht_map_impl = { 
     (map_alloc_t)ht_alloc, (map_cas_t)ht_cas, (map_get_t)ht_get, (map_remove_t)ht_remove, 
     (map_count_t)ht_count, (map_print_t)ht_print, (map_free_t)ht_free
index fed7fcda72962865db5db16afcb6943307e55f38..e4cba2d93d7f3344497491159990d00b4f209473 100644 (file)
@@ -4,14 +4,21 @@
 #include "map.h"
 
 typedef struct ll list_t;
+typedef struct ll_iter ll_iter_t;
 
-list_t * ll_alloc  (const datatype_t *key_type);
-uint64_t ll_cas    (list_t *ll, void *key, uint64_t expected_val, uint64_t new_val);
-uint64_t ll_lookup (list_t *ll, void *key);
-uint64_t ll_remove (list_t *ll, void *key);
-uint64_t ll_count  (list_t *ll);
-void     ll_print  (list_t *ll);
-void     ll_free   (list_t *ll);
+list_t * ll_alloc   (const datatype_t *key_type);
+uint64_t ll_cas     (list_t *ll, void *key, uint64_t expected_val, uint64_t new_val);
+uint64_t ll_lookup  (list_t *ll, void *key);
+uint64_t ll_remove  (list_t *ll, void *key);
+uint64_t ll_count   (list_t *ll);
+void     ll_print   (list_t *ll);
+void     ll_free    (list_t *ll);
+void *   ll_min_key (list_t *sl);
+
+ll_iter_t *ll_iter_start (list_t *ll, void *key);
+ll_iter_t *ll_iter_next  (ll_iter_t *iter);
+uint64_t   ll_iter_val   (ll_iter_t *iter);
+void *     ll_iter_key   (ll_iter_t *iter);
 
 static const map_impl_t ll_map_impl = { 
     (map_alloc_t)ll_alloc, (map_cas_t)ll_cas, (map_get_t)ll_lookup, (map_remove_t)ll_remove, 
index 08d32cfa0001d8c4fc0d63cb3c98df33f587093b..cf70656245da2efaf8b9b82d75837a61f037bcea 100644 (file)
@@ -4,15 +4,21 @@
 #include "map.h"
 
 typedef struct sl skiplist_t;
+typedef struct sl_iter sl_iter_t;
 
-skiplist_t * sl_alloc (const datatype_t *key_type);
-uint64_t sl_cas    (skiplist_t *sl, void *key, uint64_t expected_val, uint64_t new_val);
-uint64_t sl_lookup (skiplist_t *sl, void *key);
-uint64_t sl_remove (skiplist_t *sl, void *key);
-uint64_t sl_count  (skiplist_t *sl);
-void     sl_print  (skiplist_t *sl);
-void     sl_free   (skiplist_t *sl);
-void *   sl_min_key(skiplist_t *sl);
+skiplist_t *sl_alloc (const datatype_t *key_type);
+uint64_t sl_cas     (skiplist_t *sl, void *key, uint64_t expected_val, uint64_t new_val);
+uint64_t sl_lookup  (skiplist_t *sl, void *key);
+uint64_t sl_remove  (skiplist_t *sl, void *key);
+uint64_t sl_count   (skiplist_t *sl);
+void     sl_print   (skiplist_t *sl);
+void     sl_free    (skiplist_t *sl);
+void *   sl_min_key (skiplist_t *sl);
+
+sl_iter_t *sl_iter_start (skiplist_t *sl, void *key);
+sl_iter_t *sl_iter_next  (sl_iter_t *iter);
+uint64_t   sl_iter_val   (sl_iter_t *iter);
+void *     sl_iter_key   (sl_iter_t *iter);
 
 static const map_impl_t sl_map_impl = { 
     (map_alloc_t)sl_alloc, (map_cas_t)sl_cas, (map_get_t)sl_lookup, (map_remove_t)sl_remove, 
index f9bf715e7aeeee79310c8e9e3256a21a30258af3..b4d7a5de3976db050fa9cf89a6c84b833b79fe24 100644 (file)
@@ -20,7 +20,7 @@
 
 #define GET_PTR(x) ((void *)((x) & MASK(48))) // low-order 48 bits is a pointer to a nstring_t
 
-typedef struct ht_entry {
+typedef struct entry {
     uint64_t key;
     uint64_t val;
 } entry_t;
@@ -29,13 +29,21 @@ typedef struct hti {
     volatile entry_t *table;
     hashtable_t *ht; // parent ht;
     struct hti *next;
-    unsigned int scale;
+    unsigned scale;
     int max_probe;
+    int references;
     int count; // TODO: make these counters distributed
     int num_entries_copied;
-    int scan;
+    int copy_scan;
 } hti_t;
 
+struct ht_iter {
+    hti_t *  hti;
+    int64_t  idx;
+    uint64_t key;
+    uint64_t val;
+};
+
 struct ht {
     hti_t *hti;
     const datatype_t *key_type;
@@ -117,15 +125,13 @@ static volatile entry_t *hti_lookup (hti_t *hti, void *key, uint32_t key_hash, i
 
 // Allocate and initialize a hti_t with 2^<scale> entries.
 static hti_t *hti_alloc (hashtable_t *parent, int scale) {
-    // Include enough slop to align the actual table on a cache line boundry
-    size_t n = sizeof(hti_t) 
-             + sizeof(entry_t) * (1 << scale) 
-             + (CACHE_LINE_SIZE - 1);
-    hti_t *hti = (hti_t *)calloc(n, 1);
+    hti_t *hti = (hti_t *)nbd_malloc(sizeof(hti_t));
+    memset(hti, 0, sizeof(hti_t));
 
-    // Align the table of hash entries on a cache line boundry.
-    hti->table = (entry_t *)(((uint64_t)hti + sizeof(hti_t) + (CACHE_LINE_SIZE-1)) 
-                            & ~(CACHE_LINE_SIZE-1));
+    size_t sz = sizeof(entry_t) * (1 << scale);
+    entry_t *table = nbd_malloc(sz);
+    memset(table, 0, sz);
+    hti->table = table;
 
     hti->scale = scale;
 
@@ -432,24 +438,16 @@ uint64_t ht_get (hashtable_t *ht, void *key) {
     return hti_get(ht->hti, key, hash);
 }
 
-//
-uint64_t ht_cas (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t new_val) {
+// returns TRUE if copy is done
+int hti_help_copy (hti_t *hti) {
+    volatile entry_t *ent;
+    uint64_t limit; 
+    uint64_t total_copied = hti->num_entries_copied;
+    int num_copied = 0;
+    int x = hti->copy_scan; 
 
-    TRACE("h2", "ht_cas: key %p ht %p", key, ht);
-    TRACE("h2", "ht_cas: expected val %p new val %p", expected_val, new_val);
-    assert(key != DOES_NOT_EXIST);
-    assert(!IS_TAGGED(new_val, TAG1) && new_val != DOES_NOT_EXIST && new_val != TOMBSTONE);
-
-    hti_t *hti = ht->hti;
-
-    // Help with an ongoing copy.
-    if (EXPECT_FALSE(hti->next != NULL)) {
-        volatile entry_t *ent;
-        uint64_t limit; 
-        int num_copied = 0;
-        int x = hti->scan; 
-
-        TRACE("h1", "ht_cas: help copy. scan is %llu, size is %llu", x, 1<<hti->scale);
+    TRACE("h1", "ht_cas: help copy. scan is %llu, size is %llu", x, 1<<hti->scale);
+    if (total_copied == (1 << hti->scale)) {
         // Panic if we've been around the array twice and still haven't finished the copy.
         int panic = (x >= (1 << (hti->scale + 1))); 
         if (!panic) {
@@ -457,9 +455,9 @@ uint64_t ht_cas (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t new
 
             // Reserve some entries for this thread to copy. There is a race condition here because the
             // fetch and add isn't atomic, but that is ok.
-            hti->scan = x + ENTRIES_PER_COPY_CHUNK; 
+            hti->copy_scan = x + ENTRIES_PER_COPY_CHUNK; 
 
-            // <hti->scan> might be larger than the size of the table, if some thread stalls while 
+            // <copy_scan> might be larger than the size of the table, if some thread stalls while 
             // copying. In that case we just wrap around to the begining and make another pass through
             // the table.
             ent = hti->table + (x & MASK(hti->scale));
@@ -476,14 +474,37 @@ uint64_t ht_cas (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t new
             assert(ent <= hti->table + (1 << hti->scale));
         }
         if (num_copied != 0) {
-            SYNC_ADD(&hti->num_entries_copied, num_copied);
+            total_copied = SYNC_ADD(&hti->num_entries_copied, num_copied);
         }
+    }
+
+    return (total_copied == (1 << hti->scale));
+}
+
+//
+uint64_t ht_cas (hashtable_t *ht, void *key, uint64_t expected_val, uint64_t new_val) {
+
+    TRACE("h2", "ht_cas: key %p ht %p", key, ht);
+    TRACE("h2", "ht_cas: expected val %p new val %p", expected_val, new_val);
+    assert(key != DOES_NOT_EXIST);
+    assert(!IS_TAGGED(new_val, TAG1) && new_val != DOES_NOT_EXIST && new_val != TOMBSTONE);
+
+    hti_t *hti = ht->hti;
+
+    // Help with an ongoing copy.
+    if (EXPECT_FALSE(hti->next != NULL)) {
+        int done = hti_help_copy(hti);
 
         // Dispose of fully copied tables.
-        if (hti->num_entries_copied == (1 << hti->scale) || panic) {
-            assert(hti->next);
-            if (SYNC_CAS(&ht->hti, hti, hti->next) == hti) {
-                nbd_defer_free(hti); 
+        if (done && hti->references == 0) {
+
+            int r = SYNC_CAS(&hti->references, 0, -1);
+            if (r == 0) {
+                assert(hti->next);
+                if (SYNC_CAS(&ht->hti, hti, hti->next) == hti) {
+                    nbd_defer_free((void *)hti->table); 
+                    nbd_defer_free(hti); 
+                }
             }
         }
     }
@@ -544,6 +565,7 @@ void ht_free (hashtable_t *ht) {
             }
         }
         hti_t *next = hti->next;
+        nbd_free((void *)hti->table);
         nbd_free(hti);
         hti = next;
     } while (hti);
@@ -565,3 +587,70 @@ void ht_print (hashtable_t *ht) {
         hti = hti->next;
     }
 }
+
+ht_iter_t *ht_iter_start (hashtable_t *ht, void *key) {
+    hti_t *hti = ht->hti;
+    int rcount;
+    do {
+        while (((volatile hti_t *)hti)->next != NULL) {
+            do { } while (hti_help_copy(hti) != TRUE);
+            hti = hti->next;
+        }
+
+        int old = hti->references;
+        do {
+            rcount = old;
+            if (rcount != -1) {
+                old = SYNC_CAS(&hti->references, rcount, rcount + 1);
+            }
+        } while (rcount != old);
+    } while (rcount == -1);
+
+    ht_iter_t *iter = nbd_malloc(sizeof(ht_iter_t));
+    iter->hti = hti;
+    iter->idx = -1;
+
+    return iter;
+}
+
+ht_iter_t *ht_iter_next (ht_iter_t *iter) {
+    volatile entry_t *ent;
+    uint64_t key;
+    uint64_t val;
+    uint64_t table_size = (1 << iter->hti->scale);
+    do {
+        if (++iter->idx == table_size) {
+            ht_iter_free(iter);
+            return NULL;
+        }
+        ent = &iter->hti->table[++iter->idx];
+        key = ent->key;
+        val = ent->val;
+
+    } while (key == DOES_NOT_EXIST || val == DOES_NOT_EXIST || val == TOMBSTONE);
+
+    iter->key = key;
+    if (val == COPIED_VALUE) {
+        uint32_t hash = (iter->hti->ht->key_type == NULL) 
+                      ? murmur32_8b(key)
+                      : iter->hti->ht->key_type->hash((void *)key);
+        iter->val = hti_get(iter->hti->next, (void *)ent->key, hash);
+    } else {
+        iter->val = val;
+    }
+
+    return iter;
+}
+
+uint64_t ht_iter_val (ht_iter_t *iter) {
+    return iter->val;
+}
+
+uint64_t ht_iter_key (ht_iter_t *iter) {
+    return iter->key;
+}
+
+void ht_iter_free (ht_iter_t *iter) {
+    SYNC_ADD(&iter->hti->references, -1);
+}
+
index a94d2d00594274b2e2b7b661b697667ab8802d26..9e66787c2be4ef54c4c9b043b691b203d71d820f 100644 (file)
 #include "list.h"
 #include "mem.h"
 
-typedef struct node {
+typedef struct ll_iter node_t;
+
+struct ll_iter {
     void *key;
     uint64_t val;
-    struct node *next;
-} node_t;
+    node_t *next;
+};
 
 struct ll {
     node_t *head;
@@ -304,3 +306,30 @@ void ll_print (list_t *ll) {
     }
     printf("\n");
 }
+
+ll_iter_t *ll_iter_start (list_t *ll, void *key) {
+    node_t *item;
+    find_pred(NULL, &item, ll, key, FALSE);
+    return item;
+}
+
+ll_iter_t *ll_iter_next (ll_iter_t *iter) {
+    assert(iter);
+    if (EXPECT_FALSE(!iter))
+        return NULL;
+
+    node_t *next = iter->next;
+    while (next != NULL && IS_TAGGED(next->next, TAG1)) {
+        next = (node_t *)STRIP_TAG(next->next, TAG1);
+    }
+
+    return next;
+}
+
+uint64_t ll_iter_val (ll_iter_t *iter) {
+    return iter->val;
+}
+
+void *ll_iter_key (ll_iter_t *iter) {
+    return iter->key;
+}
index 62506e146e88aa5a5d58665d7524890fa3357ed5..16f7538b9d3f3304d885bf1fb944bea6b2129bb9 100644 (file)
 // Setting MAX_LEVEL to 0 essentially makes this data structure the Harris-Michael lock-free list (in list.c).
 #define MAX_LEVEL 31
 
-typedef struct node {
+typedef struct sl_iter node_t;
+
+struct sl_iter {
     void *key;
     uint64_t val;
     int top_level;
-    struct node *next[];
-} node_t;
+    node_t *next[];
+};
 
 struct sl {
     node_t *head;
@@ -472,3 +474,30 @@ void sl_print (skiplist_t *sl) {
         }
     }
 }
+
+sl_iter_t *sl_iter_start (skiplist_t *sl, void *key) {
+    node_t *item;
+    find_preds(NULL, &item, 0, sl, key, FALSE);
+    return item;
+}
+
+sl_iter_t *sl_iter_next (sl_iter_t *iter) {
+    assert(iter);
+    if (EXPECT_FALSE(!iter))
+        return NULL;
+
+    node_t *next = iter->next[0];
+    while (next != NULL && IS_TAGGED(next->next[0], TAG1)) {
+        next = (node_t *)STRIP_TAG(next->next[0], TAG1);
+    }
+
+    return next;
+}
+
+uint64_t sl_iter_val (sl_iter_t *iter) {
+    return iter->val;
+}
+
+void *sl_iter_key (sl_iter_t *iter) {
+    return iter->key;
+}
index 74d5cee2494a641995254cc35c1fee412da05d04..b721f2d8a353699b27209bc0411842406693cc48 100644 (file)
@@ -21,7 +21,7 @@ void test1 (CuTest* tc) {
     ASSERT_EQUAL( 3, tm_get(t1, k1) );
     ASSERT_EQUAL( 4, tm_get(t2, k1) );
     ASSERT_EQUAL( TXN_VALIDATED, txn_commit(t2));
-    ASSERT_EQUAL( TXN_ABORTED, txn_commit(t1));
+    ASSERT_EQUAL( TXN_ABORTED,   txn_commit(t1));
 }
 
 int main (void) {
index ea3d6df4ccbb5729fc0447218979b0bafbee703e..26e0209408c4c6a72ceda7916fcec3634d12197b 100644 (file)
--- a/txn/txn.c
+++ b/txn/txn.c
@@ -53,6 +53,7 @@ void txn_init (void) {
 // complete validating. It must be finished before we can decide to rollback or commit.
 //
 static txn_state_e tm_validate_key (txn_t *txn, void *key) {
+    assert(txn->state != TXN_RUNNING);
     
     update_t *update = (update_t *) map_get(txn->map, key);
     for (; update != NULL; update = update->next) {
@@ -111,6 +112,7 @@ static txn_state_e tm_validate_key (txn_t *txn, void *key) {
 }
 
 static txn_state_e txn_validate (txn_t *txn) {
+    assert(txn->state != TXN_RUNNING);
     int i;
     switch (txn->state) {
 
@@ -126,6 +128,7 @@ static txn_state_e txn_validate (txn_t *txn) {
                     txn->state = TXN_ABORTED;
                     break;
                 }
+                assert(s == TXN_VALIDATED);
             }
             if (txn->state == TXN_VALIDATING) {
                 txn->state =  TXN_VALIDATED;
@@ -186,6 +189,8 @@ txn_t *txn_begin (txn_type_e type, map_t *map) {
 }
 
 void txn_abort (txn_t *txn) {
+    if (txn->state != TXN_RUNNING)
+        return; // TODO: return some sort of error code
 
     int i;
     for (i = 0; i < txn->writes_count; ++i) {
@@ -198,6 +203,8 @@ void txn_abort (txn_t *txn) {
 }
 
 txn_state_e txn_commit (txn_t *txn) {
+    if (txn->state != TXN_RUNNING)
+        return txn->state; // TODO: return some sort of error code
 
     assert(txn->state == TXN_RUNNING);
     txn->state = TXN_VALIDATING;
@@ -231,6 +238,8 @@ txn_state_e txn_commit (txn_t *txn) {
 
 // Get most recent committed version prior to our read version.
 uint64_t tm_get (txn_t *txn, void *key) {
+    if (txn->state != TXN_RUNNING)
+        return ERROR_TXN_NOT_RUNNING;
 
     update_t *newest_update = (update_t *) map_get(txn->map, key);
     if (!IS_TAGGED(newest_update, TAG2))
@@ -341,6 +350,8 @@ uint64_t tm_get (txn_t *txn, void *key) {
 }
 
 void tm_set (txn_t *txn, void *key, uint64_t value) {
+    if (txn->state != TXN_RUNNING)
+        return; // TODO: return some sort of error code
 
     // create a new update record
     update_t *update = alloc_update_rec();