]> pd.if.org Git - nbds/commitdiff
in txn, clean up old update records when they can't be referenced anymore
authorjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Sun, 7 Dec 2008 10:31:08 +0000 (10:31 +0000)
committerjdybnis <jdybnis@9ec2166a-aeea-11dd-8830-69e4bb380a4a>
Sun, 7 Dec 2008 10:31:08 +0000 (10:31 +0000)
include/skiplist.h
map/skiplist.c
txn/txn.c

index ca6b5143d224db0cd37957b5c80859b56cfbf59a..08d32cfa0001d8c4fc0d63cb3c98df33f587093b 100644 (file)
@@ -5,13 +5,14 @@
 
 typedef struct sl skiplist_t;
 
-skiplist_t *sl_alloc (const datatype_t *key_type);
+skiplist_t * sl_alloc (const datatype_t *key_type);
 uint64_t sl_cas    (skiplist_t *sl, void *key, uint64_t expected_val, uint64_t new_val);
 uint64_t sl_lookup (skiplist_t *sl, void *key);
 uint64_t sl_remove (skiplist_t *sl, void *key);
 uint64_t sl_count  (skiplist_t *sl);
 void     sl_print  (skiplist_t *sl);
 void     sl_free   (skiplist_t *sl);
+void *   sl_min_key(skiplist_t *sl);
 
 static const map_impl_t sl_map_impl = { 
     (map_alloc_t)sl_alloc, (map_cas_t)sl_cas, (map_get_t)sl_lookup, (map_remove_t)sl_remove, 
index 52d7f1a622a40242029e0e15cf273b3b875970a4..49ef21b3d32bcfe43c0e38cfdad883be1994eda5 100644 (file)
@@ -231,6 +231,17 @@ uint64_t sl_lookup (skiplist_t *sl, void *key) {
     return DOES_NOT_EXIST;
 }
 
+void *sl_min_key (skiplist_t *sl) {
+    node_t *item = sl->head->next[0];
+    while (item != NULL) {
+        node_t *next = item->next[0];
+        if (!IS_TAGGED(next))
+            return item->key;
+        item = (node_t *)STRIP_TAG(next);
+    }
+    return DOES_NOT_EXIST;
+}
+
 uint64_t sl_cas (skiplist_t *sl, void *key, uint64_t expectation, uint64_t new_val) {
     TRACE("s1", "sl_cas: key %p skiplist %p", key, sl);
     TRACE("s1", "sl_cas: expectation %p new value %p", expectation, new_val);
index 6e6b7973703f9b5792e6dbef772a1c6ab0895be3..991c22c8a8351002922df123cc6413c925676a5f 100644 (file)
--- a/txn/txn.c
+++ b/txn/txn.c
@@ -43,10 +43,10 @@ static uint64_t version_ = 1;
 
 static txn_state_e txn_validate (txn_t *txn);
 
-static map_t *active_ = NULL;
+static skiplist_t *active_ = NULL;
 
 void txn_init (void) {
-    active_ = map_alloc(&sl_map_impl, NULL);
+    active_ = sl_alloc(NULL);
 }
 
 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
@@ -169,7 +169,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) {
         uint64_t temp = 0;
         do {
             old_count = temp;
-            temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1);
+            temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1);
         } while (temp != old_count);
 
         if (txn->rv == version_)
@@ -178,7 +178,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) {
         temp = 1;
         do {
             old_count = temp;
-            temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+            temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
         } while (temp != old_count);
     } while (1);
 
@@ -211,18 +211,17 @@ txn_state_e txn_commit (txn_t *txn) {
         update->version = wv;
     }
 
-    /*
     // Lower the reference count for <txn>'s read version
-    uint64_t temp = 1;
+    uint64_t temp = 2;
     uint64_t old_count;
     do {
         old_count = temp;
-        temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+        temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+        if (temp == 1 && txn->rv != version_) {
+            sl_remove(active_, (void *)txn->rv);
+            break;
+        }
     } while (old_count != temp);
-    if (old_count == 0 && version_ != txn->rv) {
-        map_remove(active_, (void *)txn->rv);
-    }
-    */
 
     nbd_defer_free(txn->writes);
     nbd_defer_free(txn);
@@ -241,7 +240,7 @@ uint64_t tm_get (txn_t *txn, void *key) {
         // If the update's version is not tagged it means the update is committed.
         if (!IS_TAGGED(update->version)) {
             if (update->version <= txn->rv)
-                return update->value;
+                break; // success
             continue;
         }
 
@@ -255,7 +254,7 @@ uint64_t tm_get (txn_t *txn, void *key) {
         // The update's transaction is still in progress. Access its txn_t.
         txn_t *writer = (txn_t *)STRIP_TAG(update->version);
         if (writer == txn) // found our own update
-            return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
+            break; // success 
 
         txn_state_e writer_state = writer->state;
         if (writer_state == TXN_RUNNING)
@@ -274,9 +273,30 @@ uint64_t tm_get (txn_t *txn, void *key) {
         assert(writer_state == TXN_VALIDATED);
         if (writer->wv > txn->rv)
             continue;
-        return  update->value;
+        break; // success
     }
-    return DOES_NOT_EXIST;
+
+    if (EXPECT_FALSE(update == NULL))
+        return DOES_NOT_EXIST;
+
+    // collect some garbage
+    update_rec_t *next = update->next;
+    if (next != NULL) {
+        uint64_t min_active_version = (uint64_t)sl_min_key(active_);
+        if (next->version < min_active_version) {
+            next = SYNC_SWAP(&update->next, NULL);
+            while (next != NULL) {
+                update = next;
+                next = NULL;
+                if (update->next != NULL) {
+                    next = SYNC_SWAP(&update->next, NULL);
+                }
+                nbd_free(update);
+            }
+        }
+    }
+    
+    return update->value;
 }
 
 void tm_set (txn_t *txn, void *key, uint64_t value) {