]> pd.if.org Git - btree/commitdiff
clean up atomic delete deadlock problem
authorunknown <karl@E04.petzent.com>
Sat, 27 Sep 2014 10:48:03 +0000 (03:48 -0700)
committerunknown <karl@E04.petzent.com>
Sat, 27 Sep 2014 10:48:03 +0000 (03:48 -0700)
threadskv8.c

index 574bbd9b275f4a096f1de23e595af7df1b36602e..f2a23c7d1ecbfddbf4957e65831b386a973c9946 100644 (file)
@@ -184,7 +184,8 @@ typedef enum {
        Unique,
        Librarian,
        Duplicate,
-       Delete
+       Delete,
+       Update
 } BtSlotType;
 
 typedef struct {
@@ -2234,8 +2235,9 @@ typedef struct {
 typedef struct {
        uid page_no;            // page number for split leaf
        void *next;                     // next key to insert
-       uint entry:30;          // latch table entry number
+       uint entry:29;          // latch table entry number
        uint type:2;            // 0 == insert, 1 == delete, 2 == free
+       uint nounlock:1;        // don't unlock ParentModification
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
@@ -2403,6 +2405,18 @@ BtVal *val;
        return 0;
 }
 
+uint bt_parentmatch (AtomicKey *head, uint entry)
+{
+uint cnt = 0;
+
+       if( head )
+         do if( head->entry == entry )
+               head->nounlock = 1, cnt++;
+         while( head = head->next );
+
+       return cnt;
+}
+
 //     atomic modification of a batch of keys.
 
 //     return -1 if BTERR is set
@@ -2602,12 +2616,11 @@ int type;
          //  schedule prev fence key update
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
          leaf->type = 0;
 
          if( tail )
@@ -2647,12 +2660,11 @@ int type;
          //  Process last page split in chain
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
          leaf->type = 0;
   
          if( tail )
@@ -2683,12 +2695,11 @@ int type;
        // Delete empty master's fence
 
        ptr = keyptr(prev->page,prev->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = prev->latch->entry;
-       leaf->next = NULL;
        leaf->type = 1;
   
        if( tail )
@@ -2701,14 +2712,13 @@ int type;
        bt_lockpage(BtLockParent, prev->latch);
        bt_unlockpage(BtLockWrite, prev->latch);
 
+       // grab master's right sibling
+
        if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
                set->page = bt_mappage (bt, set->latch);
        else
                return -1;
 
-       //      add page to our transaction
-
-       bt_lockpage(BtLockAtomic, set->latch);
        bt_lockpage(BtLockWrite, set->latch);
 
        //      pull contents over empty page
@@ -2720,18 +2730,24 @@ int type;
        set->latch->dirty = 1;
        set->page->kill = 1;
 
-       //      add new parent key for new master page contents
-       //      delete it after parent posts the new master fence.
+       //      add new fence key for new master page contents, delete
+       //      right sibling after parent posts the new master fence.
 
        ptr = keyptr(set->page,set->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = set->latch->entry;
-       leaf->next = NULL;
        leaf->type = 2;
   
+       //  see if right sibling is not yet in the FIFO
+
+       if( !bt_parentmatch (head, leaf->entry) )
+         bt_lockpage(BtLockParent, set->latch);
+
+       bt_unlockpage(BtLockWrite, set->latch);
+
        if( tail )
          tail->next = leaf;
        else
@@ -2739,9 +2755,6 @@ int type;
 
        tail = leaf;
 
-//     bt_lockpage(BtLockParent, set->latch);
-       bt_unlockpage(BtLockWrite, set->latch);
-
        //  fix master's far right sibling's left pointer
 
        if( right = bt_getid (set->page->right) ) {
@@ -2774,27 +2787,27 @@ int type;
            if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
                  return -1;
 
-           bt_unlockpage (BtLockParent, set->latch);
                break;
 
          case 1:       // delete key
                if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
                  return -1;
 
-           bt_unlockpage (BtLockParent, set->latch);
                break;
 
          case 2:       // insert key & free
            if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
                  return -1;
 
-               bt_unlockpage (BtLockAtomic, set->latch);
                bt_lockpage (BtLockDelete, set->latch);
                bt_lockpage (BtLockWrite, set->latch);
                bt_freepage (bt, set);
                break;
          }
 
+         if( !leaf->nounlock )
+           bt_unlockpage (BtLockParent, set->latch);
+
          bt_unpinlatch (set->latch);
          tail = leaf->next;
          free (leaf);
@@ -3166,7 +3179,7 @@ FILE *in;
                          nxt -= 1;
                          txn[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
-                         slotptr(page,cnt)->type = Unique;
+                         slotptr(page,cnt)->type = Delete;
                          len = 0;
 
                          if( cnt < args->num )