]> pd.if.org Git - btree/commitdiff
Several deletion bugs fixed
authorunknown <karl@E04.petzent.com>
Mon, 3 Nov 2014 19:42:20 +0000 (11:42 -0800)
committerunknown <karl@E04.petzent.com>
Mon, 3 Nov 2014 19:42:20 +0000 (11:42 -0800)
threadskv10g.c

index 6b3de970cdbd0506d45c09a871572e753c89ab90..6f1d39a43b65703137dc13fee8de0785e359cc30 100644 (file)
@@ -1670,16 +1670,11 @@ BtKey *ptr;
   //  start at root of btree and drill down
 
   do {
-       // determine lock mode of drill level
-       mode = (drill == lvl) ? lock : BtLockRead; 
-
        if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
          set->page = bt_mappage (mgr, set->latch);
        else
          return 0;
 
-       // obtain access lock using lock chaining with Access mode
-
        if( page_no > ROOT_page )
          bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
 
@@ -1692,7 +1687,9 @@ BtKey *ptr;
        }
 
        // obtain mode lock using lock coupling through AccessLock
+       // determine lock mode of drill level
 
+       mode = (drill == lvl) ? lock : BtLockRead; 
        bt_lockpage(mode, set->latch, thread_no, __LINE__);
 
        // grab our fence key
@@ -1788,6 +1785,10 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
 {
 unsigned char *freechain;
 
+       //      lock allocation page
+
+       bt_mutexlock (mgr->lock);
+
        if( set->page->lvl ) {
                freechain = mgr->pagezero->freechain;
                mgr->pagezero->upperpages--;
@@ -1796,10 +1797,6 @@ unsigned char *freechain;
                mgr->pagezero->leafpages--;
        }
 
-       //      lock allocation page
-
-       bt_mutexlock (mgr->lock);
-
        //      store chain
 
        memcpy(set->page->right, freechain, BtId);
@@ -1922,13 +1919,13 @@ BtKey *ptr;
        if( !lvl )
                page_size <<= mgr->leaf_xtra;
 
-       //  cache copy of original fence key
-       //      that is being deleted.
+       //  cache original copy of original fence key
+       //      that is going to be deleted.
 
        ptr = keyptr(set->page, set->page->cnt);
        memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
 
-       //      obtain locks on right page
+       //      pin and lock our right page
 
        page_no = bt_getid(set->page->right);
 
@@ -1942,16 +1939,15 @@ BtKey *ptr;
        if( right->page->kill )
                return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
-       // pull contents of right peer into our empty page
+       // pull contents of right sibling over our empty page
        //      preserving our left page number, and its right page number.
 
        bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
        page_no = bt_getid(set->page->left);
        memcpy (set->page, right->page, page_size);
        bt_putid (set->page->left, page_no);
-       bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
-
        set->page->page_no = set->latch->page_no;
+       bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
 
        //  fix left link from far right page
 
@@ -1971,31 +1967,33 @@ BtKey *ptr;
          bt_releasemutex(mgr->lock);
        }
 
-       // mark right page deleted and release lock
+       // mark right page as being deleted and release lock
 
        right->page->kill = 1;
        bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
 
-       // redirect higher key directly to our new node contents
+       // redirect the new higher key directly to our new node
 
-       ptr = keyptr(right->page, right->page->cnt);
+       ptr = keyptr(set->page, set->page->cnt);
        bt_putid (value, set->latch->page_no);
 
        if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
          return mgr->err;
 
-       //      delete our orignal fence key in parent
-       //      and unlock our page.
+       //      delete our original fence key in parent
 
        ptr = (BtKey *)lowerfence;
 
        if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
          return mgr->err;
 
+       //      release write lock to our node
+
        bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
        bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
 
-       //      obtain delete and write locks to right node
+       //  wait for all access to drain away with delete lock,
+       //      then obtain write lock to right node and free it.
 
        bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
        bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
@@ -2119,7 +2117,7 @@ BtVal *val;
        page->act = 0;
 
        // clean up page first by
-       // removing deleted keys
+       // removing dead keys
 
        while( cnt++ < max ) {
                if( cnt == slot )
@@ -2153,7 +2151,7 @@ BtVal *val;
                slotptr(page, idx)->type = slotptr(frame, cnt)->type;
 
                if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
-                       page->act++;
+                 page->act++;
        }
 
        page->cnt = idx;
@@ -2309,7 +2307,7 @@ uint prev;
                slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
 
                if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
-                       frame->act++;
+                 frame->act++;
        }
 
        frame->cnt = idx;
@@ -2763,38 +2761,67 @@ uint entry, slot;
 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
 {
 BtKey *key = keyptr(source, src);
+uint idx, slot, entry;
+BtLatchSet *latch;
 BtPageSet set[1];
-uint idx, slot;
 BtSlot *node;
 BtKey *ptr;
 BtVal *val;
 
-       if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
-         node = slotptr(set->page, slot);
-         ptr = keyptr(set->page, slot);
-         val = valptr(set->page, slot);
-       } else
-         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
+  while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
+       node = slotptr(set->page, slot);
+       ptr = keyptr(set->page, slot);
+       val = valptr(set->page, slot);
 
-       //  if slot is not found, insert a delete slot
+       //  if slot is not found on cache btree, insert a delete slot
+       //      otherwise ignore the request.
 
        if( keycmp (ptr, key->key, key->len) )
-         if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
-               return mgr->err;
+        if( !mgr->type )
+         if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
+               return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
+         else { //  split page before inserting Delete slot
+               if( entry = bt_splitpage (mgr, set, thread_no, 0) )
+                 latch = mgr->leafsets + entry;
+               else
+             return mgr->err;
+
+               //      splice right page into split chain
+               //      and WriteLock it
+
+               bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
+               latch->split = set->latch->split;
+               set->latch->split = entry;
+
+               // clear slot number for atomic page
+
+               locks[src].slot = 0;
+               continue;
+         }
+        else
+          return 0;
 
        //      if node is already dead,
        //      ignore the request.
 
-       if( node->dead )
+       if( node->type == Delete || node->dead )
                return 0;
 
-       set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
-       set->page->lsn = lsn;
-       set->page->act--;
+       node->type = Delete;
+
+       //  if main LSM btree, delete the slot
+
+       if( mgr->type ) {
+               set->page->act--;
+               node->dead = 1;
+       }
 
-       node->dead = 0;
        __sync_fetch_and_add(&mgr->found, 1);
+       set->page->lsn = lsn;
        return 0;
+  }
+
+  return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
 }
 
 //     release master's splits from right to left
@@ -3121,9 +3148,9 @@ BtVal *val;
          continue;
        }
 
-       // entry interiour node or being killed or constructed
+       // entry is being killed or constructed
 
-       if( set->page->lvl || set->page->nopromote || set->page->kill ) {
+       if( set->page->nopromote || set->page->kill ) {
          bt_releasemutex(set->latch->modify);
          continue;
        }
@@ -3439,6 +3466,7 @@ int cmp;
          return 0;
 
        //  main key is larger
+       //      return smaller key
 
        if( cmp < 0 ) {
          bt->phase = 0;