]> pd.if.org Git - btree/commitdiff
Fix a couple of bugs in atomicfree
authorunknown <karl@E04.petzent.com>
Sun, 28 Sep 2014 20:32:36 +0000 (13:32 -0700)
committerunknown <karl@E04.petzent.com>
Sun, 28 Sep 2014 20:32:36 +0000 (13:32 -0700)
threadskv8.c

index 91ec929670d8fd4ed6a27cd4fe040f8c9c038ec5..16ad09f024b44cef1ca8971a0e1b724bf52bd9e3 100644 (file)
@@ -1395,6 +1395,7 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
        bt_unlockpage (BtLockDelete, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
 
        // unlock allocation page
 
@@ -1476,7 +1477,6 @@ uint idx;
        root->latch->dirty = 1;
 
        bt_freepage (bt, child);
-       bt_unpinlatch (child->latch);
 
   } while( root->page->lvl > 1 && root->page->act == 1 );
 
@@ -1563,7 +1563,6 @@ BtKey *ptr;
        bt_lockpage (BtLockDelete, right->latch);
        bt_lockpage (BtLockWrite, right->latch);
        bt_freepage (bt, right);
-       bt_unpinlatch (right->latch);
 
        bt_unlockpage (BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
@@ -2305,9 +2304,6 @@ uint slot;
        else
          return 0;
 
-       if( set->page->free || set->page->lvl )
-               return bt->err = BTERR_struct, 0;
-
        // obtain read lock using lock chaining with Access mode
        //      release & unpin parent/left sibling page
 
@@ -2325,12 +2321,13 @@ uint slot;
        if( !set->page->kill )
         if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
          bt_unlockpage(BtLockRead, set->latch);
-         bt_lockpage(BtLockAtomic, set->latch);
          bt_lockpage(BtLockAccess, set->latch);
+         bt_lockpage(BtLockAtomic, set->latch);
          bt_lockpage(BtLockRead, set->latch);
          bt_unlockpage(BtLockAccess, set->latch);
 
-         if( slot = bt_findslot (set->page, key, len) )
+         if( !set->page->kill )
+          if( slot = bt_findslot (set->page, key, len) )
                return slot;
 
          bt_unlockpage(BtLockAtomic, set->latch);
@@ -2437,16 +2434,88 @@ BtVal *val;
        return 0;
 }
 
-uint bt_parentmatch (AtomicKey *head, uint entry)
+//     delete an empty master page for a transaction
+
+//     note that the far right page never empties because
+//     it always contains (at least) the infinite stopper key
+//     and that all pages that don't contain any keys are
+//     deleted, or are being held under Atomic lock.
+
+BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
 {
-uint cnt = 0;
+BtPageSet right[1], temp[1];
+unsigned char value[BtId];
+uid right_page_no;
+BtKey *ptr;
+
+       bt_lockpage(BtLockWrite, prev->latch);
+
+       //      grab the right sibling
+
+       if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+               right->page = bt_mappage (bt, right->latch);
+       else
+               return bt->err;
+
+       bt_lockpage(BtLockAtomic, right->latch);
+       bt_lockpage(BtLockWrite, right->latch);
+
+       //      and pull contents over empty page
+       //      while preserving master's left link
+
+       memcpy (right->page->left, prev->page->left, BtId);
+       memcpy (prev->page, right->page, bt->mgr->page_size);
+
+       //      forward seekers to old right sibling
+       //      to new page location in set
+
+       bt_putid (right->page->right, prev->latch->page_no);
+       right->latch->dirty = 1;
+       right->page->kill = 1;
+
+       //      remove pointer to right page for searchers by
+       //      changing right fence key to point to the master page
+
+       ptr = keyptr(right->page,right->page->cnt);
+       bt_putid (value, prev->latch->page_no);
+
+       if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+               return bt->err;
+
+       //  now that master page is in good shape we can
+       //      remove its locks.
+
+       bt_unlockpage (BtLockAtomic, prev->latch);
+       bt_unlockpage (BtLockWrite, prev->latch);
+
+       //  fix master's right sibling's left pointer
+       //      to remove scanner's poiner to the right page
+
+       if( right_page_no = bt_getid (prev->page->right) ) {
+         if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
+               temp->page = bt_mappage (bt, temp->latch);
+
+         bt_lockpage (BtLockWrite, temp->latch);
+         bt_putid (temp->page->left, prev->latch->page_no);
+         temp->latch->dirty = 1;
+
+         bt_unlockpage (BtLockWrite, temp->latch);
+         bt_unpinlatch (temp->latch);
+       } else {        // master is now the far right page
+         bt_spinwritelock (bt->mgr->lock);
+         bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+         bt_spinreleasewrite(bt->mgr->lock);
+       }
 
-       if( head )
-         do if( head->entry == entry )
-               head->nounlock = 1, cnt++;
-         while( head = head->next );
+       //      now that there are no pointers to the right page
+       //      we can delete it after the last read access occurs
 
-       return cnt;
+       bt_unlockpage (BtLockWrite, right->latch);
+       bt_unlockpage (BtLockAtomic, right->latch);
+       bt_lockpage (BtLockDelete, right->latch);
+       bt_lockpage (BtLockWrite, right->latch);
+       bt_freepage (bt, right);
+       return 0;
 }
 
 //     atomic modification of a batch of keys.
@@ -2613,6 +2682,7 @@ int type;
        samepage = src;
 
        //  pick-up all splits from master page
+       //      each one is already WriteLocked.
 
        entry = prev->latch->split;
 
@@ -2621,14 +2691,15 @@ int type;
          set->page = bt_mappage (bt, set->latch);
          entry = set->latch->split;
 
-         // delete empty master page
+         // delete empty master page by undoing its split
+         //  (this is potentially another empty page)
+         //  note that there are no new left pointers yet
 
          if( !prev->page->act ) {
                memcpy (set->page->left, prev->page->left, BtId);
                memcpy (prev->page, set->page, bt->mgr->page_size);
                bt_lockpage (BtLockDelete, set->latch);
                bt_freepage (bt, set);
-               bt_unpinlatch (set->latch);
 
                prev->latch->dirty = 1;
                continue;
@@ -2641,7 +2712,6 @@ int type;
                prev->latch->split = set->latch->split;
                bt_lockpage (BtLockDelete, set->latch);
                bt_freepage (bt, set);
-               bt_unpinlatch (set->latch);
                continue;
          }
 
@@ -2662,6 +2732,8 @@ int type;
 
          tail = leaf;
 
+         // splice in the left link into the split page
+
          bt_putid (set->page->left, prev->latch->page_no);
          bt_lockpage(BtLockParent, prev->latch);
          bt_unlockpage(BtLockWrite, prev->latch);
@@ -2672,11 +2744,11 @@ int type;
        //      (if all splits were reversed, latch->split == 0)
 
        if( latch->split ) {
-         //  fix left pointer in master's original (now split) right sibling
-         //    or set rightmost page in page zero
+         //  fix left pointer in master's original (now split)
+         //  far right sibling or set rightmost page in page zero
 
          if( right = bt_getid (prev->page->right) ) {
-               if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
+               if( set->latch = bt_pinlatch (bt, right, 1) )
                  set->page = bt_mappage (bt, set->latch);
                else
                  return -1;
@@ -2731,71 +2803,24 @@ int type;
        // master page located in prev is empty, delete it
        // by pulling over master's right sibling.
 
-       // Delete empty master's fence key
+       // Remove empty master's fence key
 
        ptr = keyptr(prev->page,prev->page->cnt);
-       leaf = calloc (sizeof(AtomicKey), 1);
-
-       memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-       leaf->page_no = prev->latch->page_no;
-       leaf->entry = prev->latch->entry;
-       leaf->type = 1;
-  
-       if( tail )
-         tail->next = leaf;
-       else
-         head = leaf;
-
-       tail = leaf;
-
-       bt_lockpage(BtLockParent, prev->latch);
-       bt_unlockpage(BtLockWrite, prev->latch);
 
-       // grab master's right sibling
-       //      note that the far right page never empties because
-       //      it always contains (at least) the infinite stopper key
-       //      and that all pages that don't contain any keys have
-       //      been deleted, or are being deleted under write lock.
-
-       if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
-               set->page = bt_mappage (bt, set->latch);
-       else
+       if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
                return -1;
 
-       bt_lockpage(BtLockWrite, set->latch);
-
-       //      and pull contents over empty page
-       //      while preserving master's left link
-
-       memcpy (set->page->left, prev->page->left, BtId);
-       memcpy (prev->page, set->page, bt->mgr->page_size);
-
-       //      forward seekers to old right sibling
-       //      to new page location in prev
+       //      perform the remainder of the delete
+       //      from the FIFO queue
 
-       bt_putid (set->page->right, prev->latch->page_no);
-       set->latch->dirty = 1;
-       set->page->kill = 1;
-
-       //      add new fence key for new master page contents, delete
-       //      right sibling after parent posts the new master fence.
-
-       ptr = keyptr(set->page,set->page->cnt);
        leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
-       leaf->entry = set->latch->entry;
+       leaf->entry = prev->latch->entry;
+       leaf->nounlock = 1;
        leaf->type = 2;
   
-       //  see if right sibling key update is in the FIFO,
-       //      and ParentLock if not there.
-
-       if( !bt_parentmatch (head, leaf->entry) )
-         bt_lockpage(BtLockParent, set->latch);
-
-       bt_unlockpage(BtLockWrite, set->latch);
-
        if( tail )
          tail->next = leaf;
        else
@@ -2803,25 +2828,10 @@ int type;
 
        tail = leaf;
 
-       //  fix new master's right sibling's left pointer
-
-       if( right = bt_getid (set->page->right) ) {
-         if( set->latch = bt_pinlatch (bt, right, 1) )
-               set->page = bt_mappage (bt, set->latch);
-
-         bt_lockpage (BtLockWrite, set->latch);
-         bt_putid (set->page->left, prev->latch->page_no);
-         set->latch->dirty = 1;
+       //      leave atomic lock in place until
+       //      deletion completes in next phase.
 
-         bt_unlockpage (BtLockWrite, set->latch);
-         bt_unpinlatch (set->latch);
-       } else {        // master is now the far right page
-         bt_spinwritelock (bt->mgr->lock);
-         bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
-         bt_spinreleasewrite(bt->mgr->lock);
-       }
-
-       bt_unlockpage(BtLockAtomic, latch);
+       bt_unlockpage(BtLockWrite, prev->latch);
   }
   
   //  add & delete keys for any pages split or merged during transaction
@@ -2847,13 +2857,10 @@ int type;
 
                break;
 
-         case 2:       // insert key & free
-           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+         case 2:       // free page
+               if( bt_atomicfree (bt, set) )
                  return -1;
 
-               bt_lockpage (BtLockDelete, set->latch);
-               bt_lockpage (BtLockWrite, set->latch);
-               bt_freepage (bt, set);
                break;
          }