]> pd.if.org Git - btree/blobdiff - threadskv8.c
clean up atomic delete deadlock problem
[btree] / threadskv8.c
index 44793a09d99365c97979c5bd79d44905ff555283..f2a23c7d1ecbfddbf4957e65831b386a973c9946 100644 (file)
@@ -155,11 +155,6 @@ typedef struct {
        uint prev;                              // prev entry in hash table chain
        volatile ushort pin;    // number of outstanding threads
        ushort dirty:1;                 // page in cache is dirty
-#ifdef unix
-       pthread_t atomictid;    // pid holding atomic lock
-#else
-       uint atomictid;                 // pid holding atomic lock
-#endif
 } BtLatchSet;
 
 //     Define the length of the page record numbers
@@ -189,7 +184,8 @@ typedef enum {
        Unique,
        Librarian,
        Duplicate,
-       Delete
+       Delete,
+       Update
 } BtSlotType;
 
 typedef struct {
@@ -652,7 +648,6 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
-       memset (&latch->atomictid, 0, sizeof(latch->atomictid));
        latch->page_no = page_no;
        latch->entry = slot;
        latch->split = 0;
@@ -1139,10 +1134,6 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch)
        case BtLockAtomic:
                WriteLock (latch->atomic);
                break;
-       case BtLockAtomic | BtLockRead:
-               WriteLock (latch->atomic);
-               ReadLock (latch->readwr);
-               break;
        }
 }
 
@@ -1167,12 +1158,6 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch)
                WriteRelease (latch->parent);
                break;
        case BtLockAtomic:
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
-               WriteRelease (latch->atomic);
-               break;
-       case BtLockAtomic | BtLockRead:
-               ReadRelease (latch->readwr);
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
                WriteRelease (latch->atomic);
                break;
        }
@@ -1293,20 +1278,10 @@ uint mode, prevmode;
          prevpage = 0;
        }
 
-       //      skip Atomic lock on leaf page we need to slide over
-
-       if( !drill )
-        if( mode & BtLockAtomic )
-         if( pthread_equal (set->latch->atomictid, pthread_self()) )
-               mode &= ~BtLockAtomic;
-
        // obtain mode lock using lock chaining through AccessLock
 
        bt_lockpage(mode, set->latch);
 
-       if( mode & BtLockAtomic )
-               set->latch->atomictid = pthread_self();
-
        if( set->page->free )
                return bt->err = BTERR_struct, 0;
 
@@ -1342,11 +1317,13 @@ uint mode, prevmode;
          if( drill == lvl )
                return slot;
 
+         // find next non-dead slot -- the fence key if nothing else
+
          while( slotptr(set->page, slot)->dead )
                if( slot++ < set->page->cnt )
-                       continue;
+                 continue;
                else
-                       goto slideright;
+                 return bt->err = BTERR_struct, 0;
 
          page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
@@ -2258,11 +2235,87 @@ typedef struct {
 typedef struct {
        uid page_no;            // page number for split leaf
        void *next;                     // next key to insert
-       uint entry:30;          // latch table entry number
+       uint entry:29;          // latch table entry number
        uint type:2;            // 0 == insert, 1 == delete, 2 == free
+       uint nounlock:1;        // don't unlock ParentModification
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
+//  find and load leaf page for given key
+//     leave page Atomic locked and Read locked.
+
+int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
+{
+BtLatchSet *prevlatch;
+uid page_no;
+uint slot;
+
+  //  find level zero page
+
+  if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
+       return 0;
+
+  // find next non-dead entry on this page
+  //   it will be the fence key if nothing else
+
+  while( slotptr(set->page, slot)->dead )
+       if( slot++ < set->page->cnt )
+         continue;
+       else
+         return bt->err = BTERR_struct, 0;
+
+  page_no = bt_getid(valptr(set->page, slot)->value);
+  prevlatch = set->latch;
+
+  while( page_no ) {
+       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+         set->page = bt_mappage (bt, set->latch);
+       else
+         return 0;
+
+       if( set->page->free || set->page->lvl )
+               return bt->err = BTERR_struct, 0;
+
+       // obtain read lock using lock chaining with Access mode
+       //      release & unpin parent/left sibling page
+
+       bt_lockpage(BtLockAccess, set->latch);
+
+       bt_unlockpage(BtLockRead, prevlatch);
+       bt_unpinlatch (prevlatch);
+
+       bt_lockpage(BtLockRead, set->latch);
+       bt_unlockpage(BtLockAccess, set->latch);
+
+       //  find key on page at this level
+       //  and descend to requested level
+
+       if( !set->page->kill )
+        if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
+         bt_unlockpage(BtLockRead, set->latch);
+         bt_lockpage(BtLockAtomic, set->latch);
+         bt_lockpage(BtLockAccess, set->latch);
+         bt_lockpage(BtLockRead, set->latch);
+         bt_unlockpage(BtLockAccess, set->latch);
+
+         if( slot = bt_findslot (set->page, key, len) )
+               return slot;
+
+         bt_unlockpage(BtLockAtomic, set->latch);
+         }
+
+       //  slide right into next page
+
+       page_no = bt_getid(set->page->right);
+       prevlatch = set->latch;
+  }
+
+  // return error on end of right chain
+
+  bt->err = BTERR_struct;
+  return 0;    // return error
+}
+
 //     determine actual page where key is located
 //  return slot number
 
@@ -2352,6 +2405,18 @@ BtVal *val;
        return 0;
 }
 
+uint bt_parentmatch (AtomicKey *head, uint entry)
+{
+uint cnt = 0;
+
+       if( head )
+         do if( head->entry == entry )
+               head->nounlock = 1, cnt++;
+         while( head = head->next );
+
+       return cnt;
+}
+
 //     atomic modification of a batch of keys.
 
 //     return -1 if BTERR is set
@@ -2415,7 +2480,7 @@ int type;
                bt_unlockpage(BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+         if( slot = bt_atomicload(bt, set, key->key, key->len) )
                set->latch->split = 0;
          else
                return -1;
@@ -2551,12 +2616,11 @@ int type;
          //  schedule prev fence key update
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
          leaf->type = 0;
 
          if( tail )
@@ -2596,12 +2660,11 @@ int type;
          //  Process last page split in chain
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
          leaf->type = 0;
   
          if( tail )
@@ -2632,12 +2695,11 @@ int type;
        // Delete empty master's fence
 
        ptr = keyptr(prev->page,prev->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = prev->latch->entry;
-       leaf->next = NULL;
        leaf->type = 1;
   
        if( tail )
@@ -2650,14 +2712,13 @@ int type;
        bt_lockpage(BtLockParent, prev->latch);
        bt_unlockpage(BtLockWrite, prev->latch);
 
+       // grab master's right sibling
+
        if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
                set->page = bt_mappage (bt, set->latch);
        else
                return -1;
 
-       //      add page to our transaction
-
-       bt_lockpage(BtLockAtomic, set->latch);
        bt_lockpage(BtLockWrite, set->latch);
 
        //      pull contents over empty page
@@ -2669,18 +2730,24 @@ int type;
        set->latch->dirty = 1;
        set->page->kill = 1;
 
-       //      add new parent key for new master page contents
-       //      delete it after parent posts the new master fence.
+       //      add new fence key for new master page contents, delete
+       //      right sibling after parent posts the new master fence.
 
        ptr = keyptr(set->page,set->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = set->latch->entry;
-       leaf->next = NULL;
        leaf->type = 2;
   
+       //  see if right sibling is not yet in the FIFO
+
+       if( !bt_parentmatch (head, leaf->entry) )
+         bt_lockpage(BtLockParent, set->latch);
+
+       bt_unlockpage(BtLockWrite, set->latch);
+
        if( tail )
          tail->next = leaf;
        else
@@ -2688,9 +2755,6 @@ int type;
 
        tail = leaf;
 
-//     bt_lockpage(BtLockParent, set->latch);
-       bt_unlockpage(BtLockWrite, set->latch);
-
        //  fix master's far right sibling's left pointer
 
        if( right = bt_getid (set->page->right) ) {
@@ -2723,27 +2787,27 @@ int type;
            if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
                  return -1;
 
-           bt_unlockpage (BtLockParent, set->latch);
                break;
 
          case 1:       // delete key
                if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
                  return -1;
 
-           bt_unlockpage (BtLockParent, set->latch);
                break;
 
          case 2:       // insert key & free
            if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
                  return -1;
 
-               bt_unlockpage (BtLockAtomic, set->latch);
                bt_lockpage (BtLockDelete, set->latch);
                bt_lockpage (BtLockWrite, set->latch);
                bt_freepage (bt, set);
                break;
          }
 
+         if( !leaf->nounlock )
+           bt_unlockpage (BtLockParent, set->latch);
+
          bt_unpinlatch (set->latch);
          tail = leaf->next;
          free (leaf);
@@ -3115,7 +3179,7 @@ FILE *in;
                          nxt -= 1;
                          txn[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
-                         slotptr(page,cnt)->type = Unique;
+                         slotptr(page,cnt)->type = Delete;
                          len = 0;
 
                          if( cnt < args->num )