]> pd.if.org Git - nbds/blobdiff - txn/txn.c
refactor header files
[nbds] / txn / txn.c
index 1073f7b6a24e3ae25fe5f124e7b11496f19604a5..e05d332386fe8284727fc0ced57c107d26a59cf2 100644 (file)
--- a/txn/txn.c
+++ b/txn/txn.c
@@ -5,6 +5,7 @@
 #include "common.h"
 #include "txn.h"
 #include "mem.h"
+#include "skiplist.h"
 
 #define UNDETERMINED_VERSION 0
 #define INITIAL_WRITES_SIZE  4
@@ -21,7 +22,7 @@ struct update_rec {
 };
 
 typedef struct write_rec {
-    const char *key;
+    void *key;
     update_rec_t *rec; 
 } write_rec_t;
 
@@ -33,20 +34,25 @@ struct txn {
     uint32_t writes_size;
     uint32_t writes_count;
     uint32_t writes_scan;
-    txn_access_e access;
-    txn_isolation_e isolation;
+    txn_type_e type;
     txn_state_e state;
 };
 
+static uint64_t version_ = 1;
+
 static txn_state_e txn_validate (txn_t *txn);
 
-static uint64_t version_ = 1;
+static map_t *active_ = NULL;
+
+void txn_init (void) {
+    active_ = map_alloc(MAP_TYPE_SKIPLIST, NULL);
+}
 
 // Validate the updates for <key>. Validation fails for a key we have written to if there is a 
 // write committed newer than our read version.
-static txn_state_e tm_validate_key (txn_t *txn, const char *key, uint32_t key_len) {
+static txn_state_e tm_validate_key (txn_t *txn, void *key) {
     
-    update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len);
+    update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
     for (; update != NULL; update = update->prev) {
         uint64_t writer_version = update->version;
         if (writer_version <= txn->rv)
@@ -102,7 +108,7 @@ static txn_state_e txn_validate (txn_t *txn) {
             }
 
             for (i = 0; i < txn->writes_count; ++i) {
-                txn_state_e s = tm_validate_key(txn, txn->writes[i].key, strlen(txn->writes[i].key));
+                txn_state_e s = tm_validate_key(txn, txn->writes[i].key);
                 if (s == TXN_ABORTED) {
                     txn->state = TXN_ABORTED;
                     break;
@@ -130,19 +136,39 @@ static update_rec_t *alloc_update_rec (void) {
     return u;
 }
 
-txn_t *txn_begin (txn_access_e access, txn_isolation_e isolation, map_type_t map_type) {
+txn_t *txn_begin (txn_type_e type, map_t *map) {
     txn_t *txn = (txn_t *)nbd_malloc(sizeof(txn_t));
     memset(txn, 0, sizeof(txn_t));
-    txn->access = access;
-    txn->isolation = isolation;
-    txn->rv = version_;
+    txn->type = type;
     txn->wv = UNDETERMINED_VERSION;
     txn->state = TXN_RUNNING;
-    txn->map = map_alloc(map_type);
-    if (isolation != TXN_READ_ONLY) {
+    txn->map = map;
+    if (type != TXN_READ_ONLY) {
         txn->writes = nbd_malloc(sizeof(*txn->writes) * INITIAL_WRITES_SIZE);
         txn->writes_size = INITIAL_WRITES_SIZE;
     }
+
+    // aquire the read version for txn.
+    do {
+        txn->rv = version_;
+
+        uint64_t old_count;
+        uint64_t temp = 0;
+        do {
+            old_count = temp;
+            temp = (uint64_t)map_cas(active_, (void *)txn->rv, old_count, old_count + 1);
+        } while (temp != old_count);
+
+        if (txn->rv == version_)
+            break;
+
+        temp = 1;
+        do {
+            old_count = temp;
+            temp = map_cas(active_, (void *)txn->rv, old_count, old_count - 1);
+        } while (temp != old_count);
+    } while (1);
+
     return txn;
 }
 
@@ -179,11 +205,11 @@ txn_state_e txn_commit (txn_t *txn) {
 }
 
 // Get most recent committed version prior to our read version.
-uint64_t tm_get (txn_t *txn, const char *key, uint32_t key_len) {
+uint64_t tm_get (txn_t *txn, void *key) {
 
     // Iterate through update records associated with <key> to find the latest committed version. 
     // We can use the first matching version. Older updates always come later in the list.
-    update_rec_t *update = (update_rec_t *) map_get(txn->map, key, key_len);
+    update_rec_t *update = (update_rec_t *) map_get(txn->map, key);
     for (; update != NULL; update = update->prev) {
         uint64_t writer_version = update->version;
         if (writer_version < txn->rv)
@@ -213,7 +239,7 @@ uint64_t tm_get (txn_t *txn, const char *key, uint32_t key_len) {
     return DOES_NOT_EXIST;
 }
 
-void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value) {
+void tm_set (txn_t *txn, void *key, uint64_t value) {
 
     // create a new update record
     update_rec_t *update = alloc_update_rec();
@@ -224,9 +250,9 @@ void tm_set (txn_t *txn, const char *key, uint32_t key_len, uint64_t value) {
     // push the new update record onto <key>'s update list
     uint64_t update_prev;
     do {
-        update->prev = (update_rec_t *) map_get(txn->map, key, key_len);
+        update->prev = (update_rec_t *) map_get(txn->map, key);
         update_prev = (uint64_t)update->prev;
-    } while (map_cas(txn->map, key, key_len, update_prev, (uint64_t)update) != update_prev);
+    } while (map_cas(txn->map, key, update_prev, (uint64_t)update) != update_prev);
 
     // add <key> to the write set for commit-time validation
     if (txn->writes_count == txn->writes_size) {