]> pd.if.org Git - nbds/blobdiff - txn/txn.c
in txn, clean up old update records when they can't be referenced anymore
[nbds] / txn / txn.c
index 6e6b7973703f9b5792e6dbef772a1c6ab0895be3..991c22c8a8351002922df123cc6413c925676a5f 100644 (file)
--- a/txn/txn.c
+++ b/txn/txn.c
@@ -43,10 +43,10 @@ static uint64_t version_ = 1;
 
 static txn_state_e txn_validate (txn_t *txn);
 
 
 static txn_state_e txn_validate (txn_t *txn);
 
-static map_t *active_ = NULL;
+static skiplist_t *active_ = NULL;
 
 void txn_init (void) {
 
 void txn_init (void) {
-    active_ = map_alloc(&sl_map_impl, NULL);
+    active_ = sl_alloc(NULL);
 }
 
 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
 }
 
 // Validate the updates for <key>. Validation fails if there is a write-write conflict. That is if after our 
@@ -169,7 +169,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) {
         uint64_t temp = 0;
         do {
             old_count = temp;
         uint64_t temp = 0;
         do {
             old_count = temp;
-            temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1);
+            temp = (uint64_t)sl_cas(active_, (void *)txn->rv, old_count, old_count + 1);
         } while (temp != old_count);
 
         if (txn->rv == version_)
         } while (temp != old_count);
 
         if (txn->rv == version_)
@@ -178,7 +178,7 @@ txn_t *txn_begin (txn_type_e type, map_t *map) {
         temp = 1;
         do {
             old_count = temp;
         temp = 1;
         do {
             old_count = temp;
-            temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+            temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
         } while (temp != old_count);
     } while (1);
 
         } while (temp != old_count);
     } while (1);
 
@@ -211,18 +211,17 @@ txn_state_e txn_commit (txn_t *txn) {
         update->version = wv;
     }
 
         update->version = wv;
     }
 
-    /*
     // Lower the reference count for <txn>'s read version
     // Lower the reference count for <txn>'s read version
-    uint64_t temp = 1;
+    uint64_t temp = 2;
     uint64_t old_count;
     do {
         old_count = temp;
     uint64_t old_count;
     do {
         old_count = temp;
-        temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+        temp = sl_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+        if (temp == 1 && txn->rv != version_) {
+            sl_remove(active_, (void *)txn->rv);
+            break;
+        }
     } while (old_count != temp);
     } while (old_count != temp);
-    if (old_count == 0 && version_ != txn->rv) {
-        map_remove(active_, (void *)txn->rv);
-    }
-    */
 
     nbd_defer_free(txn->writes);
     nbd_defer_free(txn);
 
     nbd_defer_free(txn->writes);
     nbd_defer_free(txn);
@@ -241,7 +240,7 @@ uint64_t tm_get (txn_t *txn, void *key) {
         // If the update's version is not tagged it means the update is committed.
         if (!IS_TAGGED(update->version)) {
             if (update->version <= txn->rv)
         // If the update's version is not tagged it means the update is committed.
         if (!IS_TAGGED(update->version)) {
             if (update->version <= txn->rv)
-                return update->value;
+                break; // success
             continue;
         }
 
             continue;
         }
 
@@ -255,7 +254,7 @@ uint64_t tm_get (txn_t *txn, void *key) {
         // The update's transaction is still in progress. Access its txn_t.
         txn_t *writer = (txn_t *)STRIP_TAG(update->version);
         if (writer == txn) // found our own update
         // The update's transaction is still in progress. Access its txn_t.
         txn_t *writer = (txn_t *)STRIP_TAG(update->version);
         if (writer == txn) // found our own update
-            return update->type == UPDATE_TYPE_DELETE ? DOES_NOT_EXIST : update->value;
+            break; // success 
 
         txn_state_e writer_state = writer->state;
         if (writer_state == TXN_RUNNING)
 
         txn_state_e writer_state = writer->state;
         if (writer_state == TXN_RUNNING)
@@ -274,9 +273,30 @@ uint64_t tm_get (txn_t *txn, void *key) {
         assert(writer_state == TXN_VALIDATED);
         if (writer->wv > txn->rv)
             continue;
         assert(writer_state == TXN_VALIDATED);
         if (writer->wv > txn->rv)
             continue;
-        return  update->value;
+        break; // success
     }
     }
-    return DOES_NOT_EXIST;
+
+    if (EXPECT_FALSE(update == NULL))
+        return DOES_NOT_EXIST;
+
+    // collect some garbage
+    update_rec_t *next = update->next;
+    if (next != NULL) {
+        uint64_t min_active_version = (uint64_t)sl_min_key(active_);
+        if (next->version < min_active_version) {
+            next = SYNC_SWAP(&update->next, NULL);
+            while (next != NULL) {
+                update = next;
+                next = NULL;
+                if (update->next != NULL) {
+                    next = SYNC_SWAP(&update->next, NULL);
+                }
+                nbd_free(update);
+            }
+        }
+    }
+    
+    return update->value;
 }
 
 void tm_set (txn_t *txn, void *key, uint64_t value) {
 }
 
 void tm_set (txn_t *txn, void *key, uint64_t value) {