From: jdybnis Date: Tue, 16 Dec 2008 02:28:41 +0000 (+0000) Subject: Warning tests are failing in this version X-Git-Url: https://pd.if.org/git/?p=nbds;a=commitdiff_plain;h=2cce67f0002cdb6dcdc2ab8ccf837e3d2b3336de Warning tests are failing in this version --- diff --git a/include/common.h b/include/common.h index 0d493c5..9abf743 100644 --- a/include/common.h +++ b/include/common.h @@ -34,9 +34,9 @@ #define TAG1 (1ULL << 63) #define TAG2 (1ULL << 62) -#define TAG_VALUE(v, tag) ((uint64_t)(v) | tag) -#define IS_TAGGED(v, tag) ((uint64_t)(v) & tag) -#define STRIP_TAG(v, tag) ((uint64_t)(v) & ~tag) +#define TAG_VALUE(v, tag) ((v) | tag) +#define IS_TAGGED(v, tag) ((v) & tag) +#define STRIP_TAG(v, tag) ((v) & ~tag) #define DOES_NOT_EXIST 0 #define ERROR_INVALID_OPTION (-1) diff --git a/include/lwt.h b/include/lwt.h index dd98369..d31c9eb 100644 --- a/include/lwt.h +++ b/include/lwt.h @@ -37,9 +37,9 @@ static inline void lwt_trace (const char *flag, const char *format, size_t value extern char flag_state_[256]; if (EXPECT_FALSE(flag_state_[(unsigned)flag[0]] >= flag[1])) { // embed in so we don't have to make the lwt_record_t any bigger than it already is - format = (const char *)((size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48)); - extern void lwt_trace_i (const char *format, size_t value1, size_t value2); - lwt_trace_i(format, value1, value2); + uint64_t f = ((uint64_t)(size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48)); + extern void lwt_trace_i (uint64_t format, size_t value1, size_t value2); + lwt_trace_i(f, value1, value2); } } diff --git a/include/map.h b/include/map.h index 33092d3..b749744 100644 --- a/include/map.h +++ b/include/map.h @@ -7,7 +7,7 @@ typedef struct map map_t; typedef struct map_iter map_iter_t; typedef struct map_impl map_impl_t; -typedef void * map_key_t; +typedef uint64_t map_key_t; typedef uint64_t map_val_t; map_t * map_alloc (const map_impl_t *map_impl, const datatype_t *key_type); diff --git a/include/murmur.h b/include/murmur.h index 1d79652..62e26dc 100644 --- a/include/murmur.h +++ b/include/murmur.h @@ -73,11 +73,10 @@ static inline unsigned int murmur32_8b (uint64_t key) // Initialize the hash to a 'random' value unsigned int h = 8; - const unsigned char *data = (const unsigned char *)key; + const unsigned char *data = (const unsigned char *)&key; - uint32_t k1 = *(uint32_t *)&data; - data += 4; - uint32_t k2 = *(uint32_t *)&data; + uint32_t k1 = *(uint32_t *)data; + uint32_t k2 = *(uint32_t *)(data + 4); k1 *= m; k1 ^= k1 >> r; diff --git a/include/txn.h b/include/txn.h index 339a3e5..34a3857 100644 --- a/include/txn.h +++ b/include/txn.h @@ -17,7 +17,7 @@ txn_t * txn_begin (map_t *map); void txn_abort (txn_t *txn); txn_state_e txn_commit (txn_t *txn); -uint64_t txn_map_get (txn_t *txn, void *key); -void txn_map_set (txn_t *txn, void *key, uint64_t value); +uint64_t txn_map_get (txn_t *txn, map_key_t key); +void txn_map_set (txn_t *txn, map_key_t key, map_val_t value); #endif//TXN_H diff --git a/map/hashtable.c b/map/hashtable.c index 64f7a47..a757cfc 100644 --- a/map/hashtable.c +++ b/map/hashtable.c @@ -18,7 +18,7 @@ #include "mem.h" #include "hashtable.h" -#define GET_PTR(x) ((void *)((x) & MASK(48))) // low-order 48 bits is a pointer to a nstring_t +#define GET_PTR(x) ((void *)(size_t)((x) & MASK(48))) // low-order 48 bits is a pointer to a nstring_t typedef struct entry { uint64_t key; @@ -106,9 +106,11 @@ static volatile entry_t *hti_lookup (hti_t *hti, map_key_t key, uint32_t key_has // The key in is made up of two parts. The 48 low-order bits are a pointer. The // high-order 16 bits are taken from the hash. The bits from the hash are used as a // quick check to rule out non-equal keys without doing a complete compare. - if ((key_hash >> 16) == (ent_key >> 48) && hti->ht->key_type->cmp(GET_PTR(ent_key), key) == 0) { - TRACE("h1", "hti_lookup: found entry %p with key %p", ent, GET_PTR(ent_key)); - return ent; + if ((key_hash >> 16) == (ent_key >> 48)) { + if (hti->ht->key_type->cmp(GET_PTR(ent_key), (void *)(size_t)key) == 0) { + TRACE("h1", "hti_lookup: found entry %p with key %p", ent, GET_PTR(ent_key)); + return ent; + } } } } @@ -214,14 +216,14 @@ static int hti_copy_entry (hti_t *ht1, volatile entry_t *ht1_ent, uint32_t key_h // Install the key in the new table. uint64_t ht1_ent_key = ht1_ent->key; - map_key_t key = (ht1->ht->key_type == NULL) ? (void *)ht1_ent_key : GET_PTR(ht1_ent_key); + map_key_t key = (ht1->ht->key_type == NULL) ? (map_key_t)ht1_ent_key : (map_key_t)(size_t)GET_PTR(ht1_ent_key); // The old table's dead entries don't need to be copied to the new table, but their keys need to be freed. assert(COPIED_VALUE == TAG_VALUE(TOMBSTONE, TAG1)); if (ht1_ent_val == TOMBSTONE) { TRACE("h1", "hti_copy_entry: entry %p old value was deleted, now freeing key %p", ht1_ent, key); if (EXPECT_FALSE(ht1->ht->key_type != NULL)) { - nbd_defer_free(key); + nbd_defer_free((void *)(size_t)key); } return TRUE; } @@ -229,7 +231,9 @@ static int hti_copy_entry (hti_t *ht1, volatile entry_t *ht1_ent, uint32_t key_h // We use 0 to indicate that is uninitiallized. Occasionally the key's hash will really be 0 and we // waste time recomputing it every time. It is rare enough (1 in 65k) that it won't hurt performance. if (key_hash == 0) { - key_hash = (ht1->ht->key_type == NULL) ? murmur32_8b(ht1_ent_key) : ht1->ht->key_type->hash(key); + key_hash = (ht1->ht->key_type == NULL) + ? murmur32_8b(ht1_ent_key) + : ht1->ht->key_type->hash((void *)(size_t)key); } int ht2_ent_is_empty; @@ -323,7 +327,7 @@ static map_val_t hti_cas (hti_t *hti, map_key_t key, uint32_t key_hash, map_val_ return DOES_NOT_EXIST; // Allocate . - uint64_t new_key = (uint64_t)((hti->ht->key_type == NULL) ? key : hti->ht->key_type->clone(key)); + uint64_t new_key = (uint64_t)((hti->ht->key_type == NULL) ? key : (map_key_t)(size_t)hti->ht->key_type->clone((void *)(size_t)key)); if (EXPECT_FALSE(hti->ht->key_type != NULL)) { // Combine pointer with bits from its hash new_key = ((uint64_t)(key_hash >> 16) << 48) | new_key; @@ -432,7 +436,7 @@ static map_val_t hti_get (hti_t *hti, map_key_t key, uint32_t key_hash) { // map_val_t ht_get (hashtable_t *ht, map_key_t key) { - uint32_t hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash(key); + uint32_t hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash((void *)(size_t)key); return hti_get(ht->hti, key, hash); } @@ -508,7 +512,7 @@ map_val_t ht_cas (hashtable_t *ht, map_key_t key, map_val_t expected_val, map_va } map_val_t old_val; - uint32_t key_hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash(key); + uint32_t key_hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash((void *)(size_t)key); while ((old_val = hti_cas(hti, key, key_hash, expected_val, new_val)) == COPIED_VALUE) { assert(hti->next); hti = hti->next; @@ -522,7 +526,7 @@ map_val_t ht_cas (hashtable_t *ht, map_key_t key, map_val_t expected_val, map_va map_val_t ht_remove (hashtable_t *ht, map_key_t key) { hti_t *hti = ht->hti; map_val_t val; - uint32_t key_hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash(key); + uint32_t key_hash = (ht->key_type == NULL) ? murmur32_8b((uint64_t)key) : ht->key_type->hash((void *)(size_t)key); do { val = hti_cas(hti, key, key_hash, CAS_EXPECT_WHATEVER, DOES_NOT_EXIST); if (val != COPIED_VALUE) @@ -576,7 +580,7 @@ void ht_print (hashtable_t *ht) { printf("hti:%p scale:%u count:%d copied:%d\n", hti, hti->scale, hti->count, hti->num_entries_copied); for (int i = 0; i < (1 << hti->scale); ++i) { volatile entry_t *ent = hti->table + i; - printf("[0x%x] %p:%p\n", i, (void *)ent->key, (void *)ent->val); + printf("[0x%x] 0x%llx:0x%llx\n", i, (uint64_t)ent->key, ent->val); if (i > 30) { printf("...\n"); break; @@ -587,7 +591,6 @@ void ht_print (hashtable_t *ht) { } ht_iter_t *ht_iter_begin (hashtable_t *ht, map_key_t key) { - assert(key == NULL); hti_t *hti = ht->hti; int rcount; do { @@ -623,7 +626,7 @@ map_val_t ht_iter_next (ht_iter_t *iter, map_key_t *key_ptr) { return DOES_NOT_EXIST; } ent = &iter->hti->table[iter->idx]; - key = (iter->hti->ht->key_type == NULL) ? (map_key_t)ent->key : GET_PTR(ent->key); + key = (iter->hti->ht->key_type == NULL) ? (map_key_t)ent->key : (map_key_t)(size_t)GET_PTR(ent->key); val = ent->val; } while (key == DOES_NOT_EXIST || val == DOES_NOT_EXIST || val == TOMBSTONE); @@ -634,7 +637,7 @@ map_val_t ht_iter_next (ht_iter_t *iter, map_key_t *key_ptr) { if (val == COPIED_VALUE) { uint32_t hash = (iter->hti->ht->key_type == NULL) ? murmur32_8b((uint64_t)key) - : iter->hti->ht->key_type->hash(key); + : iter->hti->ht->key_type->hash((void *)(size_t)key); val = hti_get(iter->hti->next, (map_key_t)ent->key, hash); } diff --git a/map/list.c b/map/list.c index b9df598..250e406 100644 --- a/map/list.c +++ b/map/list.c @@ -13,11 +13,13 @@ #include "list.h" #include "mem.h" -typedef struct ll_iter node_t; - -struct ll_iter { +typedef struct node { map_key_t key; map_val_t val; + uint64_t next; // next node +} node_t; + +struct ll_iter { node_t *next; }; @@ -36,15 +38,15 @@ static node_t *node_alloc (map_key_t key, map_val_t val) { list_t *ll_alloc (const datatype_t *key_type) { list_t *ll = (list_t *)nbd_malloc(sizeof(list_t)); ll->key_type = key_type; - ll->head = node_alloc(NULL, 0); - ll->head->next = NULL; + ll->head = node_alloc(0, 0); + ll->head->next = DOES_NOT_EXIST; return ll; } void ll_free (list_t *ll) { - node_t *item = ll->head->next; + node_t *item = (node_t *)(size_t)ll->head->next; // the head can't be tagged while (item) { - node_t *next = (node_t *)STRIP_TAG(item->next, TAG1); + node_t *next = (node_t *)(size_t)STRIP_TAG(item->next, TAG1); nbd_free(item); item = next; } @@ -52,30 +54,30 @@ void ll_free (list_t *ll) { uint64_t ll_count (list_t *ll) { uint64_t count = 0; - node_t *item = ll->head->next; + node_t *item = (node_t *)(size_t)ll->head->next; while (item) { if (!IS_TAGGED(item->next, TAG1)) { count++; } - item = (node_t *)STRIP_TAG(item->next, TAG1); + item = (node_t *)(size_t)STRIP_TAG(item->next, TAG1); } return count; } static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_t key, int help_remove) { node_t *pred = ll->head; - node_t *item = pred->next; + node_t *item = (node_t *)(size_t)pred->next; TRACE("l2", "find_pred: searching for key %p in list (head is %p)", key, pred); while (item != NULL) { - node_t *next = item->next; + uint64_t next = item->next; // A tag means an item is logically removed but not physically unlinked yet. while (EXPECT_FALSE(IS_TAGGED(next, TAG1))) { // Skip over logically removed items. if (!help_remove) { - item = (node_t *)STRIP_TAG(item->next, TAG1); + item = (node_t *)(size_t)STRIP_TAG(item->next, TAG1); if (EXPECT_FALSE(item == NULL)) break; TRACE("l3", "find_pred: skipping marked item %p (next is %p)", item, next); @@ -84,30 +86,27 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_ } // Unlink logically removed items. - node_t *other; TRACE("l3", "find_pred: unlinking marked item %p next is %p", item, next); - if ((other = SYNC_CAS(&pred->next, item, STRIP_TAG(next, TAG1))) == item) { + + uint64_t other = SYNC_CAS(&pred->next, (uint64_t)(size_t)item, STRIP_TAG(next, TAG1)); + if (other == (uint64_t)(size_t)item) { TRACE("l2", "find_pred: unlinked item %p from pred %p", item, pred); - item = (node_t *)STRIP_TAG(next, TAG1); - if (EXPECT_FALSE(item == NULL)) - break; - next = item->next; + item = (node_t *)(size_t)STRIP_TAG(next, TAG1); + next = (item != NULL) ? item->next : DOES_NOT_EXIST; TRACE("l3", "find_pred: now current item is %p next is %p", item, next); // The thread that completes the unlink should free the memory. if (ll->key_type != NULL) { - nbd_defer_free((void*)other->key); + nbd_defer_free((void *)(size_t)((node_t *)(size_t)other)->key); } - nbd_defer_free(other); + nbd_defer_free(((node_t *)(size_t)other)); } else { TRACE("l2", "find_pred: lost a race to unlink item %p from pred %p", item, pred); TRACE("l2", "find_pred: pred's link changed to %p", other, 0); if (IS_TAGGED(other, TAG1)) return find_pred(pred_ptr, item_ptr, ll, key, help_remove); // retry - item = other; - if (EXPECT_FALSE(item == NULL)) - break; - next = item->next; + item = (node_t *)(size_t)other; + next = (item != NULL) ? item->next : DOES_NOT_EXIST; } } @@ -121,7 +120,12 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_ if (EXPECT_TRUE(ll->key_type == NULL)) { d = (uint64_t)item->key - (uint64_t)key; } else { - d = ll->key_type->cmp(item->key, key); + d = ll->key_type->cmp((void *)(size_t)item->key, (void *)(size_t)key); + } + + if (next != DOES_NOT_EXIST && ((node_t *)next)->key < item->key) { + lwt_halt(); + assert(0); } // If we reached the key (or passed where it should be), we found the right predesssor @@ -139,7 +143,7 @@ static int find_pred (node_t **pred_ptr, node_t **item_ptr, list_t *ll, map_key_ } pred = item; - item = next; + item = (node_t *)(size_t)next; } // is not in . @@ -190,10 +194,12 @@ map_val_t ll_cas (list_t *ll, map_key_t key, map_val_t expectation, map_val_t ne // Create a new item and insert it into the list. TRACE("l2", "ll_cas: attempting to insert item between %p and %p", pred, pred->next); - map_key_t new_key = (ll->key_type == NULL) ? key : ll->key_type->clone(key); + map_key_t new_key = (ll->key_type == NULL) + ? key + : (map_key_t)(size_t)ll->key_type->clone((void *)(size_t)key); node_t *new_item = node_alloc(new_key, new_val); - node_t *next = new_item->next = old_item; - node_t *other = SYNC_CAS(&pred->next, next, new_item); + uint64_t next = new_item->next = (uint64_t)(size_t)old_item; + uint64_t other = SYNC_CAS(&pred->next, next, new_item); if (other == next) { TRACE("l1", "ll_cas: successfully inserted new item %p", new_item, 0); return DOES_NOT_EXIST; // success @@ -202,7 +208,7 @@ map_val_t ll_cas (list_t *ll, map_key_t key, map_val_t expectation, map_val_t ne // Lost a race. Failed to insert the new item into the list. TRACE("l1", "ll_cas: lost a race. CAS failed. expected pred's link to be %p but found %p", next, other); if (ll->key_type != NULL) { - nbd_free(new_key); + nbd_free((void *)(size_t)new_key); } nbd_free(new_item); continue; // retry @@ -250,10 +256,10 @@ map_val_t ll_remove (list_t *ll, map_key_t key) { } // Mark removed. If multiple threads try to remove the same item only one of them should succeed. - node_t *next; - node_t *old_next = item->next; + uint64_t next; + uint64_t old_next = item->next; do { - next = old_next; + next = old_next; old_next = SYNC_CAS(&item->next, next, TAG_VALUE(next, TAG1)); if (IS_TAGGED(old_next, TAG1)) { TRACE("l1", "ll_remove: lost a race -- %p is already marked for removal by another thread", item, 0); @@ -261,7 +267,7 @@ map_val_t ll_remove (list_t *ll, map_key_t key) { } } while (next != old_next); TRACE("l2", "ll_remove: logically removed item %p", item, 0); - ASSERT(IS_TAGGED(item->next, TAG1)); + ASSERT(IS_TAGGED(((volatile node_t *)item)->next, TAG1)); // Atomically swap out the item's value in case another thread is updating the item while we are // removing it. This establishes which operation occurs first logically, the update or the remove. @@ -272,15 +278,15 @@ map_val_t ll_remove (list_t *ll, map_key_t key) { // item logically removed for a later call (or some other thread) to physically unlink. By marking the // item earlier, we logically removed it. TRACE("l2", "ll_remove: unlink the item by linking its pred %p to its successor %p", pred, next); - node_t *other; - if ((other = SYNC_CAS(&pred->next, item, next)) != item) { + uint64_t other; + if ((other = SYNC_CAS(&pred->next, (uint64_t)(size_t)item, next)) != (uint64_t)(size_t)item) { TRACE("l1", "ll_remove: unlink failed; pred's link changed from %p to %p", item, other); return val; } // The thread that completes the unlink should free the memory. if (ll->key_type != NULL) { - nbd_defer_free(item->key); + nbd_defer_free((void *)(size_t)item->key); } nbd_defer_free(item); TRACE("l1", "ll_remove: successfully unlinked item %p from the list", item, 0); @@ -288,27 +294,28 @@ map_val_t ll_remove (list_t *ll, map_key_t key) { } void ll_print (list_t *ll) { - node_t *item; - item = ll->head->next; + uint64_t next = ll->head->next; int i = 0; - while (item) { - node_t *next = item->next; - if (IS_TAGGED(item, TAG1)) { + while (next != DOES_NOT_EXIST) { + if (IS_TAGGED(next, TAG1)) { printf("*"); } - printf("%p:%p ", item, item->key); + node_t *item = (node_t *)(size_t)STRIP_TAG(next, TAG1); + if (item == NULL) + break; + printf("%p:0x%llx ", item, item->key); fflush(stdout); - item = (node_t *)STRIP_TAG(next, TAG1); if (i++ > 30) { printf("..."); break; } + next = item->next; } printf("\n"); } ll_iter_t *ll_iter_begin (list_t *ll, map_key_t key) { - node_t *iter = node_alloc(0,0); + ll_iter_t *iter = (ll_iter_t *)nbd_malloc(sizeof(ll_iter_t)); find_pred(NULL, &iter->next, ll, key, FALSE); return iter; } @@ -317,13 +324,13 @@ map_val_t ll_iter_next (ll_iter_t *iter, map_key_t *key_ptr) { assert(iter); node_t *item = iter->next; while (item != NULL && IS_TAGGED(item->next, TAG1)) { - item = (node_t *)STRIP_TAG(item->next, TAG1); + item = (node_t *)(size_t)STRIP_TAG(item->next, TAG1); } if (item == NULL) { iter->next = NULL; return DOES_NOT_EXIST; } - iter->next = item->next; + iter->next = (node_t *)(size_t)STRIP_TAG(item->next, TAG1); if (key_ptr != NULL) { *key_ptr = item->key; } diff --git a/map/skiplist.c b/map/skiplist.c index afdb1f7..c2777a9 100644 --- a/map/skiplist.c +++ b/map/skiplist.c @@ -28,13 +28,15 @@ // Setting MAX_LEVEL to 0 essentially makes this data structure the Harris-Michael lock-free list (in list.c). #define MAX_LEVEL 31 -typedef struct sl_iter node_t; - -struct sl_iter { +typedef struct node { map_key_t key; map_val_t val; int top_level; - node_t *next[]; + uint64_t next[]; +} node_t; + +struct sl_iter { + node_t *next; }; struct sl { @@ -68,15 +70,18 @@ static node_t *node_alloc (int level, map_key_t key, map_val_t val) { skiplist_t *sl_alloc (const datatype_t *key_type) { skiplist_t *sl = (skiplist_t *)nbd_malloc(sizeof(skiplist_t)); sl->key_type = key_type; - sl->head = node_alloc(MAX_LEVEL, NULL, 0); + sl->head = node_alloc(MAX_LEVEL, 0, 0); memset(sl->head->next, 0, (MAX_LEVEL+1) * sizeof(skiplist_t *)); return sl; } void sl_free (skiplist_t *sl) { - node_t *item = sl->head->next[0]; + node_t *item = (node_t *)(size_t)sl->head->next[0]; while (item) { - node_t *next = (node_t *)STRIP_TAG(item->next[0], TAG1); + node_t *next = (node_t *)(size_t)STRIP_TAG(item->next[0], TAG1); + if (sl->key_type != NULL) { + nbd_free((void *)(size_t)item->key); + } nbd_free(item); item = next; } @@ -84,12 +89,12 @@ void sl_free (skiplist_t *sl) { uint64_t sl_count (skiplist_t *sl) { uint64_t count = 0; - node_t *item = sl->head->next[0]; + node_t *item = (node_t *)(size_t)sl->head->next[0]; while (item) { if (!IS_TAGGED(item->next[0], TAG1)) { count++; } - item = (node_t *)STRIP_TAG(item->next[0], TAG1); + item = (node_t *)(size_t)STRIP_TAG(item->next[0], TAG1); } return count; } @@ -103,7 +108,7 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl #if MAX_LEVEL > 2 // Optimization for small lists. No need to traverse empty higher levels. start_level = 2; - while (pred->next[start_level+1] != NULL) { + while (pred->next[start_level+1] != DOES_NOT_EXIST) { start_level += start_level - 1; if (EXPECT_FALSE(start_level >= MAX_LEVEL)) { start_level = MAX_LEVEL; @@ -118,53 +123,50 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl // Traverse the levels of from the top level to the bottom for (int level = start_level; level >= 0; --level) { TRACE("s3", "find_preds: level %llu", level, 0); - item = pred->next[level]; - if (EXPECT_FALSE(IS_TAGGED(item, TAG1))) { - TRACE("s2", "find_preds: pred %p is marked for removal (item %p); retry", pred, item); + uint64_t next = pred->next[level]; + if (EXPECT_FALSE(IS_TAGGED(next, TAG1))) { + TRACE("s2", "find_preds: pred %p is marked for removal (next %p); retry", pred, next); return find_preds(preds, succs, n, sl, key, help_remove); // retry } - while (item != NULL) { - node_t *next = item->next[level]; + while (next != DOES_NOT_EXIST) { + item = (node_t *)(size_t)next; + next = item->next[level]; // A tag means an item is logically removed but not physically unlinked yet. while (EXPECT_FALSE(IS_TAGGED(next, TAG1))) { // Skip over logically removed items. if (!help_remove) { - item = (node_t *)STRIP_TAG(item->next, TAG1); + item = (node_t *)(size_t)STRIP_TAG(next, TAG1); if (EXPECT_FALSE(item == NULL)) break; - TRACE("s3", "find_preds: skipping marked item %p (next is %p)", item, next); next = item->next[level]; + TRACE("s3", "find_preds: skipping marked item %p (next is 0x%llx)", item, next); continue; } // Unlink logically removed items. - node_t *other; - TRACE("s3", "find_preds: unlinking marked item %p; next is %p", item, next); - if ((other = SYNC_CAS(&pred->next[level], item, STRIP_TAG(next, TAG1))) == item) { - item = (node_t *)STRIP_TAG(next, TAG1); - if (EXPECT_FALSE(item == NULL)) - break; - next = item->next[level]; - TRACE("s3", "find_preds: now the current item is %p next is %p", item, next); + TRACE("s3", "find_preds: unlinking marked item %p; next is 0x%llx", item, next); + uint64_t other = SYNC_CAS(&pred->next[level], (uint64_t)(size_t)item, STRIP_TAG(next, TAG1)); + if (other == (uint64_t)(size_t)item) { + item = (node_t *)(size_t)STRIP_TAG(next, TAG1); + next = (item != NULL) ? item->next[level] : DOES_NOT_EXIST; + TRACE("s3", "find_preds: now the current item is %p next is 0x%llx", item, next); // The thread that completes the unlink should free the memory. if (level == 0) { if (sl->key_type != NULL) { - nbd_defer_free((void*)other->key); + nbd_defer_free((void *)(size_t)((node_t *)(size_t)other)->key); } - nbd_defer_free(other); + nbd_defer_free(((node_t *)(size_t)other)); } } else { TRACE("s3", "find_preds: lost race to unlink item %p from pred %p", item, pred); TRACE("s3", "find_preds: pred's link changed to %p", other, 0); if (IS_TAGGED(other, TAG1)) return find_preds(preds, succs, n, sl, key, help_remove); // retry - item = other; - if (EXPECT_FALSE(item == NULL)) - break; - next = item->next[level]; + item = (node_t *)(size_t)other; + next = (item != NULL) ? item->next[level] : DOES_NOT_EXIST; } } @@ -177,7 +179,7 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl if (EXPECT_TRUE(sl->key_type == NULL)) { d = (uint64_t)item->key - (uint64_t)key; } else { - d = sl->key_type->cmp(item->key, key); + d = sl->key_type->cmp((void *)(size_t)item->key, (void *)(size_t)key); } if (d >= 0) { @@ -186,7 +188,7 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl } pred = item; - item = next; + item = (node_t *)(size_t)next; } // The cast to unsigned is for the case when n is -1. @@ -202,6 +204,7 @@ static node_t *find_preds (node_t **preds, node_t **succs, int n, skiplist_t *sl // fill in empty levels if (n == -1 && item != NULL) { + assert(item->top_level <= MAX_LEVEL); for (int level = start_level + 1; level <= item->top_level; ++level) { preds[level] = sl->head; } @@ -234,12 +237,12 @@ map_val_t sl_lookup (skiplist_t *sl, map_key_t key) { } map_key_t sl_min_key (skiplist_t *sl) { - node_t *item = sl->head->next[0]; + node_t *item = (node_t *)(size_t)sl->head->next[0]; while (item != NULL) { - node_t *next = item->next[0]; + uint64_t next = item->next[0]; if (!IS_TAGGED(next, TAG1)) return item->key; - item = (node_t *)STRIP_TAG(next, TAG1); + item = (node_t *)(size_t)STRIP_TAG(next, TAG1); } return DOES_NOT_EXIST; } @@ -267,21 +270,23 @@ map_val_t sl_cas (skiplist_t *sl, map_key_t key, map_val_t expectation, map_val_ // First insert into the bottom level. TRACE("s3", "sl_cas: attempting to insert item between %p and %p", preds[0], nexts[0]); - map_key_t new_key = (sl->key_type == NULL) ? key : sl->key_type->clone(key); + map_key_t new_key = (sl->key_type == NULL) + ? key + : (map_key_t)(size_t)sl->key_type->clone((void *)(size_t)key); new_item = node_alloc(n, new_key, new_val); node_t *pred = preds[0]; - node_t *next = new_item->next[0] = nexts[0]; + uint64_t next = new_item->next[0] = (uint64_t)(size_t)nexts[0]; for (int level = 1; level <= new_item->top_level; ++level) { - new_item->next[level] = nexts[level]; + new_item->next[level] = (uint64_t)(size_t)nexts[level]; } - node_t *other = SYNC_CAS(&pred->next[0], next, new_item); + uint64_t other = SYNC_CAS(&pred->next[0], next, (uint64_t)(size_t)new_item); if (other == next) { TRACE("s3", "sl_cas: successfully inserted item %p at level 0", new_item, 0); break; // success } TRACE("s3", "sl_cas: failed to change pred's link: expected %p found %p", next, other); if (sl->key_type != NULL) { - nbd_free(new_key); + nbd_free((void *)(size_t)new_key); } nbd_free(new_item); continue; @@ -320,10 +325,10 @@ map_val_t sl_cas (skiplist_t *sl, map_key_t key, map_val_t expectation, map_val_ // Link into from the bottom up. for (int level = 1; level <= new_item->top_level; ++level) { node_t *pred = preds[level]; - node_t *next = nexts[level]; + uint64_t next = (uint64_t)(size_t)nexts[level]; do { TRACE("s3", "sl_cas: attempting to insert item between %p and %p", pred, next); - node_t *other = SYNC_CAS(&pred->next[level], next, new_item); + uint64_t other = SYNC_CAS(&pred->next[level], next, (uint64_t)(size_t)new_item); if (other == next) { TRACE("s3", "sl_cas: successfully inserted item %p at level %llu", new_item, level); break; // success @@ -331,12 +336,12 @@ map_val_t sl_cas (skiplist_t *sl, map_key_t key, map_val_t expectation, map_val_ TRACE("s3", "sl_cas: failed to change pred's link: expected %p found %p", next, other); find_preds(preds, nexts, new_item->top_level, sl, key, TRUE); pred = preds[level]; - next = nexts[level]; + next = (uint64_t)(size_t)nexts[level]; // Update 's next pointer do { // There in no need to continue linking in the item if another thread removed it. - node_t *old_next = ((volatile node_t *)new_item)->next[level]; + uint64_t old_next = ((volatile node_t *)new_item)->next[level]; if (IS_TAGGED(old_next, TAG1)) return DOES_NOT_EXIST; // success @@ -361,8 +366,8 @@ map_val_t sl_remove (skiplist_t *sl, map_key_t key) { // Mark and unlink at each level of from the top down. If multiple threads try to concurrently remove // the same item only one of them should succeed. Marking the bottom level establishes which of them succeeds. for (int level = item->top_level; level > 0; --level) { - node_t *next; - node_t *old_next = item->next[level]; + uint64_t next; + uint64_t old_next = item->next[level]; do { next = old_next; old_next = SYNC_CAS(&item->next[level], next, TAG_VALUE(next, TAG1)); @@ -374,20 +379,21 @@ map_val_t sl_remove (skiplist_t *sl, map_key_t key) { node_t *pred = preds[level]; TRACE("s2", "sl_remove: linking the item's pred %p to the item's successor %p", pred, STRIP_TAG(next, TAG1)); - node_t *other = NULL; - if ((other = SYNC_CAS(&pred->next[level], item, STRIP_TAG(next, TAG1))) != item) { + uint64_t other = SYNC_CAS(&pred->next[level], (uint64_t)(size_t)item, STRIP_TAG(next, TAG1)); + if (other != (uint64_t)(size_t)item) { TRACE("s1", "sl_remove: unlink failed; pred's link changed from %p to %p", item, other); // If our former predecessor now points past us we know another thread unlinked us. Otherwise, we need // to search for a new set of preds. - if (other == NULL) + if (other == DOES_NOT_EXIST) continue; // points past to the end of the list; go on to the next level. int d = -1; if (!IS_TAGGED(other, TAG1)) { + map_key_t other_key = ((node_t *)(size_t)other)->key; if (EXPECT_TRUE(sl->key_type == NULL)) { - d = (uint64_t)item->key - (uint64_t)other->key; + d = (uint64_t)item->key - (uint64_t)other_key; } else { - d = sl->key_type->cmp(item->key, other->key); + d = sl->key_type->cmp((void *)(size_t)item->key, (void *)(size_t)other_key); } } if (d > 0) { @@ -399,8 +405,8 @@ map_val_t sl_remove (skiplist_t *sl, map_key_t key) { } } - node_t *next; - node_t *old_next = item->next[0]; + uint64_t next; + uint64_t old_next = item->next[0]; do { next = old_next; old_next = SYNC_CAS(&item->next[0], next, TAG_VALUE(next, TAG1)); @@ -422,7 +428,7 @@ map_val_t sl_remove (skiplist_t *sl, map_key_t key) { TRACE("s2", "sl_remove: unlinked item %p from the skiplist at level 0", item, 0); // The thread that completes the unlink should free the memory. if (sl->key_type != NULL) { - nbd_defer_free(item->key); + nbd_defer_free((void *)(size_t)item->key); } nbd_defer_free(item); } @@ -432,14 +438,14 @@ map_val_t sl_remove (skiplist_t *sl, map_key_t key) { void sl_print (skiplist_t *sl) { for (int level = MAX_LEVEL; level >= 0; --level) { node_t *item = sl->head; - if (item->next[level] == NULL) + if (item->next[level] == DOES_NOT_EXIST) continue; printf("(%d) ", level); int i = 0; while (item) { - node_t *next = item->next[level]; + uint64_t next = item->next[level]; printf("%s%p ", IS_TAGGED(next, TAG1) ? "*" : "", item); - item = (node_t *)STRIP_TAG(next, TAG1); + item = (node_t *)(size_t)STRIP_TAG(next, TAG1); if (i++ > 30) { printf("..."); break; @@ -452,22 +458,22 @@ void sl_print (skiplist_t *sl) { int i = 0; while (item) { int is_marked = IS_TAGGED(item->next[0], TAG1); - printf("%s%p:%p ", is_marked ? "*" : "", item, item->key); + printf("%s%p:0x%llx ", is_marked ? "*" : "", item, (uint64_t)(size_t)item->key); if (item != sl->head) { printf("[%d]", item->top_level); } else { printf("[HEAD]"); } for (int level = 1; level <= item->top_level; ++level) { - node_t *next = (node_t *)STRIP_TAG(item->next[level], TAG1); + node_t *next = (node_t *)(size_t)STRIP_TAG(item->next[level], TAG1); is_marked = IS_TAGGED(item->next[0], TAG1); printf(" %p%s", next, is_marked ? "*" : ""); - if (item == sl->head && item->next[level] == NULL) + if (item == sl->head && item->next[level] == DOES_NOT_EXIST) break; } printf("\n"); fflush(stdout); - item = (node_t *)STRIP_TAG(item->next[0], TAG1); + item = (node_t *)(size_t)STRIP_TAG(item->next[0], TAG1); if (i++ > 30) { printf("...\n"); break; @@ -476,22 +482,22 @@ void sl_print (skiplist_t *sl) { } sl_iter_t *sl_iter_begin (skiplist_t *sl, map_key_t key) { - node_t *iter = node_alloc(0, 0, 0); - find_preds(NULL, &iter->next[0], 0, sl, key, FALSE); + sl_iter_t *iter = (sl_iter_t *)nbd_malloc(sizeof(sl_iter_t)); + find_preds(NULL, &iter->next, 0, sl, key, FALSE); return iter; } map_val_t sl_iter_next (sl_iter_t *iter, map_key_t *key_ptr) { assert(iter); - node_t *item = iter->next[0]; + node_t *item = iter->next; while (item != NULL && IS_TAGGED(item->next[0], TAG1)) { - item = (node_t *)STRIP_TAG(item->next[0], TAG1); + item = (node_t *)(size_t)STRIP_TAG(item->next[0], TAG1); } if (item == NULL) { - iter->next[0] = NULL; + iter->next = NULL; return DOES_NOT_EXIST; } - iter->next[0] = item->next[0]; + iter->next = (node_t *)(size_t)STRIP_TAG(item->next[0], TAG1); if (key_ptr != NULL) { *key_ptr = item->key; } diff --git a/runtime/lwt.c b/runtime/lwt.c index f6bd346..0c3147f 100644 --- a/runtime/lwt.c +++ b/runtime/lwt.c @@ -18,9 +18,9 @@ volatile int halt_ = 0; typedef struct lwt_record { uint64_t timestamp; - const char *format; - size_t value1; - size_t value2; + uint64_t format; + uint64_t value1; + uint64_t value2; } lwt_record_t; typedef struct lwt_buffer { @@ -54,12 +54,12 @@ void lwt_set_trace_level (const char *flags) static inline void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) { // print the record if its trace category is enabled at a high enough level - int flag = (size_t)r->format >> 56; - int level = ((size_t)r->format >> 48) & 0xFF; + int flag = r->format >> 56; + int level = (r->format >> 48) & 0xFF; if (flag_state_[(unsigned)flag] >= level) { char s[3] = {flag, level, '\0'}; fprintf(file, "%09llu %d %s ", ((uint64_t)r->timestamp - offset) >> 5, thread_id, s); - const char *format = (const char *)((size_t)r->format & MASK(48)); // strip out the embedded flags + const char *format = (const char *)(size_t)(r->format & MASK(48)); // strip out the embedded flags fprintf(file, format, r->value1, r->value2); fprintf(file, "\n"); } @@ -118,7 +118,7 @@ void lwt_dump (const char *file_name) } } -void lwt_trace_i (const char *format, size_t value1, size_t value2) { +void lwt_trace_i (uint64_t format, size_t value1, size_t value2) { while (halt_) {} LOCALIZE_THREAD_LOCAL(tid_, int); lwt_buffer_t *tb = lwt_buf_[tid_]; diff --git a/test/map_test1.c b/test/map_test1.c index f380918..1aa13d2 100644 --- a/test/map_test1.c +++ b/test/map_test1.c @@ -11,7 +11,7 @@ #include "skiplist.h" #include "hashtable.h" -#define NUM_ITERATIONS 10000000 +#define NUM_ITERATIONS 1000000 //#define TEST_STRING_KEYS @@ -42,9 +42,9 @@ void *worker (void *arg) { } #else if (r & (1 << 8)) { - map_set(map_, (void *)(key + 1), 1); + map_set(map_, (map_key_t)(key + 1), 1); } else { - map_remove(map_, (void *)(key + 1)); + map_remove(map_, (map_key_t)(key + 1)); } #endif @@ -56,7 +56,7 @@ void *worker (void *arg) { int main (int argc, char **argv) { nbd_init(); - lwt_set_trace_level("l3"); + lwt_set_trace_level("r0m0l3"); char* program_name = argv[0]; pthread_t thread[MAX_NUM_THREADS]; diff --git a/test/map_test2.c b/test/map_test2.c index bb06971..831ecad 100644 --- a/test/map_test2.c +++ b/test/map_test2.c @@ -34,7 +34,7 @@ typedef struct worker_data { static const map_impl_t *map_type_; static uint64_t iterator_size (map_t *map) { - map_iter_t *iter = map_iter_begin(map, NULL); + map_iter_t *iter = map_iter_begin(map, 0); uint64_t count = 0; while (map_iter_next(iter, NULL) != DOES_NOT_EXIST) { count++; @@ -54,10 +54,10 @@ void basic_test (CuTest* tc) { nstring_t *k4 = ns_alloc(3); strcpy(k1->data, "k4"); #else map_t *map = map_alloc(map_type_, NULL); - void *k1 = (void *)1; - void *k2 = (void *)2; - void *k3 = (void *)3; - void *k4 = (void *)4; + map_key_t k1 = (map_key_t)1; + map_key_t k2 = (map_key_t)2; + map_key_t k3 = (map_key_t)3; + map_key_t k4 = (map_key_t)4; #endif ASSERT_EQUAL( 0, map_count (map) ); @@ -138,7 +138,7 @@ void *add_remove_worker (void *arg) { #ifdef TEST_STRING_KEYS nstring_t *key = ns_alloc(9); #else - void *key; + map_key_t key; #endif for (int j = 0; j < 10; ++j) { @@ -147,7 +147,7 @@ void *add_remove_worker (void *arg) { memset(key->data, 0, key->len); snprintf(key->data, key->len, "%llu", i); #else - key = (void *)i; + key = (map_key_t)i; #endif TRACE("t0", "test map_add() iteration (%llu, %llu)", j, i); ASSERT_EQUAL(DOES_NOT_EXIST, map_add(map, key, d+1) ); @@ -158,7 +158,7 @@ void *add_remove_worker (void *arg) { memset(key->data, 0, key->len); snprintf(key->data, key->len, "%llu", i); #else - key = (void *)i; + key = (map_key_t)i; #endif TRACE("t0", "test map_remove() iteration (%llu, %llu)", j, i); ASSERT_EQUAL(d+1, map_remove(map, key) ); @@ -173,7 +173,8 @@ void concurrent_add_remove_test (CuTest* tc) { pthread_t thread[2]; worker_data_t wd[2]; - volatile int wait = 2; + static const int num_threads = 2; + volatile int wait = num_threads; #ifdef TEST_STRING_KEYS map_t *map = map_alloc(map_type_, &DATATYPE_NSTRING); #else @@ -185,7 +186,7 @@ void concurrent_add_remove_test (CuTest* tc) { // In 2 threads, add & remove even & odd elements concurrently int i; - for (i = 0; i < 2; ++i) { + for (i = 0; i < num_threads; ++i) { wd[i].id = i; wd[i].tc = tc; wd[i].map = map; @@ -194,14 +195,14 @@ void concurrent_add_remove_test (CuTest* tc) { if (rc != 0) { perror("nbd_thread_create"); return; } } - for (i = 0; i < 2; ++i) { + for (i = 0; i < num_threads; ++i) { pthread_join(thread[i], NULL); } gettimeofday(&tv2, NULL); int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000; map_print(map); - printf("Th:%d Time:%dms\n", 2, ms); + printf("Th:%d Time:%dms\n", num_threads, ms); fflush(stdout); // In the end, all members should be removed @@ -221,17 +222,17 @@ void basic_iteration_test (CuTest* tc) { nstring_t *y_k; #else map_t *map = map_alloc(map_type_, NULL); - void *k1 = (void *)1; - void *k2 = (void *)2; - void *x_k; - void *y_k; + map_key_t k1 = (map_key_t)1; + map_key_t k2 = (map_key_t)2; + map_key_t x_k; + map_key_t y_k; #endif ASSERT_EQUAL( DOES_NOT_EXIST, map_add (map, k1,1) ); ASSERT_EQUAL( DOES_NOT_EXIST, map_add (map, k2,2) ); uint64_t x_v, y_v; - map_iter_t *iter = map_iter_begin(map, NULL); + map_iter_t *iter = map_iter_begin(map, 0); x_v = map_iter_next(iter, &x_k); y_v = map_iter_next(iter, &y_k); ASSERT_EQUAL( DOES_NOT_EXIST, map_iter_next(iter, NULL) ); @@ -259,9 +260,9 @@ void big_iteration_test (CuTest* tc) { nstring_t *k4 = ns_alloc(3); strcpy(k1->data, "k4"); #else map_t *map = map_alloc(map_type_, NULL); - void *k3 = (void *)3; - void *k4 = (void *)4; - void *key; + map_key_t k3 = (map_key_t)3; + map_key_t k4 = (map_key_t)4; + map_key_t key; #endif for (size_t i = 1; i <= n; ++i) { @@ -269,7 +270,7 @@ void big_iteration_test (CuTest* tc) { memset(key->data, 0, key->len); snprintf(key->data, key->len, "k%llu", i); #else - key = (void *)i; + key = (map_key_t)i; #endif ASSERT_EQUAL( DOES_NOT_EXIST, map_get(map, key) ); ASSERT_EQUAL( DOES_NOT_EXIST, map_set(map, key, i) ); @@ -282,7 +283,7 @@ void big_iteration_test (CuTest* tc) { uint64_t sum = 0; uint64_t val; - map_iter_t *iter = map_iter_begin(map, NULL); + map_iter_t *iter = map_iter_begin(map, 0); while ((val = map_iter_next(iter, NULL)) != DOES_NOT_EXIST) { sum += val; } @@ -291,7 +292,7 @@ void big_iteration_test (CuTest* tc) { ASSERT_EQUAL(3, map_remove(map, k3)); ASSERT_EQUAL(4, map_remove(map, k4)); sum = 0; - iter = map_iter_begin(map, NULL); + iter = map_iter_begin(map, 0); while ((val = map_iter_next(iter, NULL)) != DOES_NOT_EXIST) { sum += val; } @@ -317,9 +318,9 @@ int main (void) { CuSuite* suite = CuSuiteNew(); SUITE_ADD_TEST(suite, basic_test); - SUITE_ADD_TEST(suite, concurrent_add_remove_test); SUITE_ADD_TEST(suite, basic_iteration_test); SUITE_ADD_TEST(suite, big_iteration_test); + SUITE_ADD_TEST(suite, concurrent_add_remove_test); CuSuiteRun(suite); CuSuiteDetails(suite, output); diff --git a/test/txn_test.c b/test/txn_test.c index 80b8f93..5a7606a 100644 --- a/test/txn_test.c +++ b/test/txn_test.c @@ -13,7 +13,7 @@ void test1 (CuTest* tc) { map_t *map = map_alloc(&ht_map_impl, NULL); txn_t *t1 = txn_begin(map); txn_t *t2 = txn_begin(map); - void *k1 = (void *)1; + map_key_t k1 = (map_key_t)1; txn_map_set(t1, k1, 2); txn_map_set(t1, k1, 3); ASSERT_EQUAL( DOES_NOT_EXIST, txn_map_get(t2, k1) ); diff --git a/todo b/todo index da53a45..e73a5a9 100644 --- a/todo +++ b/todo @@ -11,11 +11,11 @@ memory manangement ------------------ - make rcu yield when its buffer gets full instead of throwing an assert -- alternate memory reclamation schemes, hazard pointers and/or reference count -- verify key management in list, skiplist, and hashtable +- alternate memory reclamation schemes: hazard pointers and/or reference counting +- verify the key management in list, skiplist, and hashtable -quaility --------- +quality +------- - transaction tests - port perf tests from lib-high-scale - characterize the performance of hashtable, list and skiplist @@ -33,7 +33,7 @@ optimization features -------- -- 32 bit version of hashtable -- verify list and skiplist work on 32 bit platforms +- a version of hashtable for 32bit keys and values +- verify correctness on 32 bit platforms - allow values of 0 to be inserted into maps (change DOES_NOT_EXIST to something else) - seperate nbd_malloc/nbd_free into general purpose malloc/free replacement diff --git a/txn/txn.c b/txn/txn.c index bb304b3..2893d35 100644 --- a/txn/txn.c +++ b/txn/txn.c @@ -14,13 +14,13 @@ typedef struct update_rec update_t; struct update_rec { - update_t *next; // an earlier update uint64_t version; - uint64_t value; + map_val_t value; + map_val_t next; // an earlier update }; typedef struct write_rec { - void *key; + map_key_t key; update_t *rec; } write_rec_t; @@ -29,16 +29,16 @@ struct txn { uint64_t wv; map_t *map; write_rec_t *writes; - uint32_t writes_size; - uint32_t writes_count; - uint32_t writes_scan; + uint64_t writes_size; + uint64_t writes_count; + uint64_t writes_scan; txn_state_e state; }; -static uint64_t version_ = 1; - static txn_state_e txn_validate (txn_t *txn); +static uint64_t version_ = 1; + static skiplist_t *active_ = NULL; void txn_init (void) { @@ -51,20 +51,21 @@ void txn_init (void) { // If we encounter a potential conflict with a transaction that is in the process of validating, we help it // complete validating. It must be finished before we can decide to rollback or commit. // -static txn_state_e validate_key (txn_t *txn, void *key) { +static txn_state_e validate_key (txn_t *txn, map_key_t key) { assert(txn->state != TXN_RUNNING); - update_t *update = (update_t *) map_get(txn->map, key); - for (; update != NULL; update = update->next) { + map_val_t val = map_get(txn->map, key); + update_t *update = NULL; + for (; val != DOES_NOT_EXIST; val = update->next) { // If the update or its version is not tagged it means the update is committed. // // We can stop at the first committed record we find that is at least as old as our read version. All // the other committed records following it will be older. And all the uncommitted records following it // will eventually conflict with it and abort. - if (!IS_TAGGED(update, TAG2)) + if (!IS_TAGGED(val, TAG2)) return TXN_VALIDATED; - update = (update_t *)STRIP_TAG(update, TAG2); + update = (update_t *)(size_t)STRIP_TAG(val, TAG2); if (!IS_TAGGED(update->version, TAG1)) return (update->version <= txn->rv) ? TXN_VALIDATED : TXN_ABORTED; @@ -76,7 +77,7 @@ static txn_state_e validate_key (txn_t *txn, void *key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1); if (writer == txn) continue; // Skip our own updates. txn_state_e writer_state = writer->state; @@ -168,7 +169,7 @@ txn_t *txn_begin (map_t *map) { uint64_t temp = 0; do { old_count = temp; - temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1); + temp = (uint64_t)sl_cas(active_, (map_key_t)txn->rv, old_count, old_count + 1); } while (temp != old_count); if (txn->rv == version_) @@ -177,7 +178,7 @@ txn_t *txn_begin (map_t *map) { temp = 1; do { old_count = temp; - temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1); } while (temp != old_count); } while (1); @@ -219,9 +220,9 @@ txn_state_e txn_commit (txn_t *txn) { uint64_t old_count; do { old_count = temp; - temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1); + temp = sl_cas(active_, (map_key_t)txn->rv, old_count, old_count - 1); if (temp == 1 && txn->rv != version_) { - sl_remove(active_, (void *)txn->rv); + sl_remove(active_, (map_key_t)txn->rv); break; } } while (old_count != temp); @@ -233,22 +234,20 @@ txn_state_e txn_commit (txn_t *txn) { } // Get most recent committed version prior to our read version. -uint64_t txn_map_get (txn_t *txn, void *key) { +uint64_t txn_map_get (txn_t *txn, map_key_t key) { if (txn->state != TXN_RUNNING) return ERROR_TXN_NOT_RUNNING; - update_t *newest_update = (update_t *) map_get(txn->map, key); - if (!IS_TAGGED(newest_update, TAG2)) - return (uint64_t)newest_update; - // Iterate through the update records to find the latest committed version prior to our read version. - update_t *update; - for (update = newest_update; ; update = update->next) { + uint64_t newest_val = map_get(txn->map, key); + uint64_t val = newest_val; + update_t *update = NULL; + for ( ; ; val = update->next) { - if (!IS_TAGGED(update, TAG2)) - return (uint64_t)update; + if (!IS_TAGGED(val, TAG2)) + return val; - update = (update_t *)STRIP_TAG(update, TAG2); + update = (update_t *)(size_t)STRIP_TAG(val, TAG2); assert(update != NULL); // If the update's version is not tagged it means the update is committed. @@ -266,7 +265,7 @@ uint64_t txn_map_get (txn_t *txn, void *key) { continue; // The update's transaction is still in progress. Access its txn_t. - txn_t *writer = (txn_t *)STRIP_TAG(update->version, TAG1); + txn_t *writer = (txn_t *)(size_t)STRIP_TAG(update->version, TAG1); if (writer == txn) // found our own update break; // success @@ -293,73 +292,77 @@ uint64_t txn_map_get (txn_t *txn, void *key) { uint64_t value = update->value; // collect some garbage - update_t *last = update; - update_t *next = update->next; - uint64_t min_active = 0; - if (IS_TAGGED(next, TAG2)) { - next = (update_t *)STRIP_TAG(next, TAG2); - min_active = (uint64_t)sl_min_key(active_); - if (next->version < min_active) { - - // Skip over aborted versions to verify the chain of updates is old enough for collection - update_t *temp = next; + uint64_t min_active_version = UNDETERMINED_VERSION; + update_t *next_update = NULL; + if (IS_TAGGED(update->next, TAG2)) { + next_update = (update_t *)(size_t)STRIP_TAG(update->next, TAG2); + min_active_version = (uint64_t)sl_min_key(active_); + if (next_update->version < min_active_version) { + // (and all update records following it [execpt if it is aborted]) is old enough that it is + // not visible to any active transaction. We can safely free it. + + // Skip over aborted versions to look for more recent updates + update_t *temp = next_update; while (temp->version == ABORTED_VERSION) { assert(!IS_TAGGED(temp->version, TAG1)); - update_t *temp = next->next; - if (!IS_TAGGED(temp, TAG2)) + uint64_t next = next_update->next; + if (!IS_TAGGED(next, TAG2)) break; - temp = (update_t *)STRIP_TAG(temp, TAG2); - if (temp->version >= min_active) + + temp = (update_t *)(size_t)STRIP_TAG(next, TAG2); + if (temp->version >= min_active_version) return value; - temp = temp->next; } - // collect and all the update records following it - do { - next = SYNC_SWAP(&update->next, NULL); + // free and all the update records following it + temp = next_update; + while (1) { + uint64_t next = SYNC_SWAP(&temp->next, DOES_NOT_EXIST); // if we find ourself in a race just back off and let the other thread take care of it - if (next == NULL) + if (next == DOES_NOT_EXIST) return value; - update = next; - next = next->next; + if (!IS_TAGGED(next, TAG2)) + break; + + temp = (update_t *)(size_t)STRIP_TAG(next, TAG2); nbd_free(update); - } while (IS_TAGGED(next, TAG2)); + } } } // If there is one item left and it is visible by all active transactions we can merge it into the map itself. // There is no need for an update record. - if (next == NULL && last == (update_t *)STRIP_TAG(newest_update, TAG2)) { - if (min_active == UNDETERMINED_VERSION) { - min_active = (uint64_t)sl_min_key(active_); + if (next_update == NULL && val == newest_val) { + if (min_active_version == UNDETERMINED_VERSION) { + min_active_version = (uint64_t)sl_min_key(active_); } - if (last->version <= min_active) { - if (map_cas(txn->map, key, TAG_VALUE(last, TAG2), value) == TAG_VALUE(last, TAG2)) { - nbd_defer_free(last); + if (update->version <= min_active_version) { + if (map_cas(txn->map, key, TAG_VALUE(val, TAG2), value) == TAG_VALUE(val, TAG2)) { + nbd_defer_free(update); } } - } + } return value; } -void txn_map_set (txn_t *txn, void *key, uint64_t value) { +void txn_map_set (txn_t *txn, map_key_t key, uint64_t value) { if (txn->state != TXN_RUNNING) return; // TODO: return some sort of error code // create a new update record update_t *update = alloc_update_rec(); update->value = value; - update->version = TAG_VALUE(txn, TAG1); + update->version = TAG_VALUE((uint64_t)(size_t)txn, TAG1); // push the new update record onto 's update list uint64_t old_update; do { old_update = map_get(txn->map, key); - update->next = (update_t *)old_update; - } while (map_cas(txn->map, key, old_update, TAG_VALUE(update, TAG2)) != old_update); + update->next = old_update; + } while (map_cas(txn->map, key, old_update, TAG_VALUE((uint64_t)(size_t)update, TAG2)) != old_update); // add to the write set for commit-time validation if (txn->writes_count == txn->writes_size) {