]> pd.if.org Git - btree/blobdiff - threadskv8.c
Fix bug in bt_cleanpage and bt_splitpage
[btree] / threadskv8.c
index 91ec929670d8fd4ed6a27cd4fe040f8c9c038ec5..7cf31e57b8af4d822b501049c4ab3b887611d4f1 100644 (file)
@@ -275,6 +275,7 @@ typedef struct {
        uint latchtotal;                        // number of page latch entries
        uint latchhash;                         // number of latch hash table slots
        uint latchvictim;                       // next latch entry to examine
+       ushort thread_no[1];            // next thread number
        BtHashEntry *hashtable;         // the buffer pool hash table entries
        BtLatchSet *latchsets;          // mapped latch set from buffer pool
        unsigned char *pagepool;        // mapped to the buffer pool pages
@@ -294,6 +295,7 @@ typedef struct {
        int found;                              // last delete or insert was found
        int err;                                // last error
        int reads, writes;              // number of reads and writes from the btree
+       ushort thread_no;               // thread number
 } BtDb;
 
 typedef enum {
@@ -1118,6 +1120,11 @@ BtDb *bt = malloc (sizeof(*bt));
 #endif
        bt->frame = (BtPage)bt->mem;
        bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
+#ifdef unix
+       bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
+#else
+       bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
+#endif
        return bt;
 }
 
@@ -1145,7 +1152,7 @@ int ans;
 
 // place write, read, or parent lock on requested page_no.
 
-void bt_lockpage(BtLock mode, BtLatchSet *latch)
+void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
@@ -1171,7 +1178,7 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch)
 
 // remove write, read, or parent lock on requested page
 
-void bt_unlockpage(BtLock mode, BtLatchSet *latch)
+void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
@@ -1298,27 +1305,27 @@ uint mode, prevmode;
        // obtain access lock using lock chaining with Access mode
 
        if( page_no > ROOT_page )
-         bt_lockpage(BtLockAccess, set->latch);
+         bt_lockpage(bt, BtLockAccess, set->latch);
 
        set->page = bt_mappage (bt, set->latch);
 
        //      release & unpin parent or left sibling page
 
        if( prevpage ) {
-         bt_unlockpage(prevmode, prevlatch);
+         bt_unlockpage(bt, prevmode, prevlatch);
          bt_unpinlatch (prevlatch);
          prevpage = 0;
        }
 
        // obtain mode lock using lock chaining through AccessLock
 
-       bt_lockpage(mode, set->latch);
+       bt_lockpage(bt, mode, set->latch);
 
        if( set->page->free )
                return bt->err = BTERR_struct, 0;
 
        if( page_no > ROOT_page )
-         bt_unlockpage(BtLockAccess, set->latch);
+         bt_unlockpage(bt, BtLockAccess, set->latch);
 
        // re-read and re-lock root after determining actual level of root
 
@@ -1329,7 +1336,7 @@ uint mode, prevmode;
                drill = set->page->lvl;
 
                if( lock != BtLockRead && drill == lvl ) {
-                 bt_unlockpage(mode, set->latch);
+                 bt_unlockpage(bt, mode, set->latch);
                  bt_unpinlatch (set->latch);
                  continue;
                }
@@ -1342,10 +1349,8 @@ uint mode, prevmode;
        //  find key on page at this level
        //  and descend to requested level
 
-       if( set->page->kill )
-         goto slideright;
-
-       if( slot = bt_findslot (set->page, key, len) ) {
+       if( !set->page->kill )
+        if( slot = bt_findslot (set->page, key, len) ) {
          if( drill == lvl )
                return slot;
 
@@ -1360,13 +1365,11 @@ uint mode, prevmode;
          page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
          continue;
-       }
+        }
 
        //  or slide right into next page
 
-slideright:
        page_no = bt_getid(set->page->right);
-
   } while( page_no );
 
   // return error on end of right chain
@@ -1393,8 +1396,9 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
        // unlock released page
 
-       bt_unlockpage (BtLockDelete, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unlockpage (bt, BtLockDelete, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
 
        // unlock allocation page
 
@@ -1423,8 +1427,8 @@ uint idx;
        ptr = keyptr(set->page, set->page->cnt);
        memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
 
-       bt_lockpage (BtLockParent, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_lockpage (bt, BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
 
        //      insert new (now smaller) fence key
 
@@ -1441,7 +1445,7 @@ uint idx;
        if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
                return bt->err;
 
-       bt_unlockpage (BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch(set->latch);
        return 0;
 }
@@ -1469,18 +1473,17 @@ uint idx;
        else
                return bt->err;
 
-       bt_lockpage (BtLockDelete, child->latch);
-       bt_lockpage (BtLockWrite, child->latch);
+       bt_lockpage (bt, BtLockDelete, child->latch);
+       bt_lockpage (bt, BtLockWrite, child->latch);
 
        memcpy (root->page, child->page, bt->mgr->page_size);
        root->latch->dirty = 1;
 
        bt_freepage (bt, child);
-       bt_unpinlatch (child->latch);
 
   } while( root->page->lvl > 1 && root->page->act == 1 );
 
-  bt_unlockpage (BtLockWrite, root->latch);
+  bt_unlockpage (bt, BtLockWrite, root->latch);
   bt_unpinlatch (root->latch);
   return 0;
 }
@@ -1513,7 +1516,7 @@ BtKey *ptr;
        else
                return 0;
 
-       bt_lockpage (BtLockWrite, right->latch);
+       bt_lockpage (bt, BtLockWrite, right->latch);
 
        // cache copy of key to update
 
@@ -1536,11 +1539,11 @@ BtKey *ptr;
        right->latch->dirty = 1;
        right->page->kill = 1;
 
-       bt_lockpage (BtLockParent, right->latch);
-       bt_unlockpage (BtLockWrite, right->latch);
+       bt_lockpage (bt, BtLockParent, right->latch);
+       bt_unlockpage (bt, BtLockWrite, right->latch);
 
-       bt_lockpage (BtLockParent, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_lockpage (bt, BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
 
        // redirect higher key directly to our new node contents
 
@@ -1559,13 +1562,12 @@ BtKey *ptr;
 
        //      obtain delete and write locks to right node
 
-       bt_unlockpage (BtLockParent, right->latch);
-       bt_lockpage (BtLockDelete, right->latch);
-       bt_lockpage (BtLockWrite, right->latch);
+       bt_unlockpage (bt, BtLockParent, right->latch);
+       bt_lockpage (bt, BtLockDelete, right->latch);
+       bt_lockpage (bt, BtLockWrite, right->latch);
        bt_freepage (bt, right);
-       bt_unpinlatch (right->latch);
 
-       bt_unlockpage (BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
        bt->found = 1;
        return 0;
@@ -1634,7 +1636,7 @@ BtVal *val;
                return bt_deletepage (bt, set);
 
        set->latch->dirty = 1;
-       bt_unlockpage(BtLockWrite, set->latch);
+       bt_unlockpage(bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
        return bt->found = found, 0;
 }
@@ -1666,13 +1668,13 @@ uid page_no;
 
        // obtain access lock using lock chaining with Access mode
 
-       bt_lockpage(BtLockAccess, set->latch);
+       bt_lockpage(bt, BtLockAccess, set->latch);
 
-       bt_unlockpage(BtLockRead, prevlatch);
+       bt_unlockpage(bt, BtLockRead, prevlatch);
        bt_unpinlatch (prevlatch);
 
-       bt_lockpage(BtLockRead, set->latch);
-       bt_unlockpage(BtLockAccess, set->latch);
+       bt_lockpage(bt, BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockAccess, set->latch);
        return 1;
 }
 
@@ -1730,7 +1732,7 @@ BtVal *val;
 
    } while( slot = bt_findnext (bt, set, slot) );
 
-  bt_unlockpage (BtLockRead, set->latch);
+  bt_unlockpage (bt, BtLockRead, set->latch);
   bt_unpinlatch (set->latch);
   return ret;
 }
@@ -1775,7 +1777,9 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || bt->frame->lvl )
+                 if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -1792,11 +1796,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -1886,7 +1888,7 @@ BtVal *val;
 
        // release and unpin root pages
 
-       bt_unlockpage(BtLockWrite, root->latch);
+       bt_unlockpage(bt, BtLockWrite, root->latch);
        bt_unpinlatch (root->latch);
 
        bt_unpinlatch (right);
@@ -1916,8 +1918,10 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
+
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
                memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
@@ -1929,11 +1933,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -2035,10 +2037,10 @@ BtKey *ptr;
 
        // insert new fences in their parent pages
 
-       bt_lockpage (BtLockParent, right);
+       bt_lockpage (bt, BtLockParent, right);
 
-       bt_lockpage (BtLockParent, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_lockpage (bt, BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
 
        // insert new fence for reformulated left block of smaller keys
 
@@ -2056,10 +2058,10 @@ BtKey *ptr;
        if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
                return bt->err;
 
-       bt_unlockpage (BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
 
-       bt_unlockpage (BtLockParent, right);
+       bt_unlockpage (bt, BtLockParent, right);
        bt_unpinlatch (right);
        return 0;
 }
@@ -2132,7 +2134,7 @@ BtVal *val;
        node->dead = 0;
 
        if( release ) {
-               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (bt, BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
        }
 
@@ -2217,7 +2219,7 @@ uint type;
                slotptr(set->page, slot)->dead = 0;
                val->len = vallen;
                memcpy (val->value, value, vallen);
-               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unlockpage(bt, BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
                return 0;
        }
@@ -2251,7 +2253,7 @@ uint type;
        ptr->len = keylen;
        
        slotptr(set->page, slot)->off = set->page->min;
-       bt_unlockpage(BtLockWrite, set->latch);
+       bt_unlockpage(bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
        return 0;
   }
@@ -2262,7 +2264,7 @@ typedef struct {
        uint entry;                     // latch table entry number
        uint slot:31;           // page slot number
        uint reuse:1;           // reused previous page
-} AtomicMod;
+} AtomicTxn;
 
 typedef struct {
        uid page_no;            // page number for split leaf
@@ -2282,7 +2284,7 @@ BtLatchSet *prevlatch;
 uid page_no;
 uint slot;
 
-  //  find level zero page
+  //  find level one slot
 
   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
        return 0;
@@ -2305,39 +2307,37 @@ uint slot;
        else
          return 0;
 
-       if( set->page->free || set->page->lvl )
-               return bt->err = BTERR_struct, 0;
-
        // obtain read lock using lock chaining with Access mode
        //      release & unpin parent/left sibling page
 
-       bt_lockpage(BtLockAccess, set->latch);
+       bt_lockpage(bt, BtLockAccess, set->latch);
 
-       bt_unlockpage(BtLockRead, prevlatch);
+       bt_unlockpage(bt, BtLockRead, prevlatch);
        bt_unpinlatch (prevlatch);
 
-       bt_lockpage(BtLockRead, set->latch);
-       bt_unlockpage(BtLockAccess, set->latch);
+       bt_lockpage(bt, BtLockRead, set->latch);
 
        //  find key on page at this level
        //  and descend to requested level
 
        if( !set->page->kill )
         if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
-         bt_unlockpage(BtLockRead, set->latch);
-         bt_lockpage(BtLockAtomic, set->latch);
-         bt_lockpage(BtLockAccess, set->latch);
-         bt_lockpage(BtLockRead, set->latch);
-         bt_unlockpage(BtLockAccess, set->latch);
+         bt_unlockpage(bt, BtLockRead, set->latch);
+         bt_lockpage(bt, BtLockAtomic, set->latch);
+         bt_lockpage(bt, BtLockRead, set->latch);
 
-         if( slot = bt_findslot (set->page, key, len) )
+         if( !set->page->kill )
+          if( slot = bt_findslot (set->page, key, len) ) {
+               bt_unlockpage(bt, BtLockAccess, set->latch);
                return slot;
+          }
 
-         bt_unlockpage(BtLockAtomic, set->latch);
+         bt_unlockpage(bt, BtLockAtomic, set->latch);
          }
 
        //  slide right into next page
 
+       bt_unlockpage(bt, BtLockAccess, set->latch);
        page_no = bt_getid(set->page->right);
        prevlatch = set->latch;
   }
@@ -2351,7 +2351,7 @@ uint slot;
 //     determine actual page where key is located
 //  return slot number
 
-uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
+uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
 {
 BtKey *key = keyptr(source,src);
 uint slot = locks[src].slot;
@@ -2368,15 +2368,18 @@ uint entry;
                return slot;
        }
 
-       //      is locks->reuse set?
-       //      if so, find where our key
-       //      is located on previous page or split pages
+       //      is locks->reuse set? or was slot zeroed?
+       //      if so, find where our key is located 
+       //      on current page or pages split on
+       //      same page txn operations.
 
        do {
                set->latch = bt->mgr->latchsets + entry;
                set->page = bt_mappage (bt, set->latch);
 
                if( slot = bt_findslot(set->page, key->key, key->len) ) {
+                 if( slotptr(set->page, slot)->type == Librarian )
+                       slot++;
                  if( locks[src].reuse )
                        locks[src].entry = entry;
                  return slot;
@@ -2387,17 +2390,17 @@ uint entry;
        return 0;
 }
 
-BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
 BtLatchSet *latch;
 BtPageSet set[1];
-uint entry;
+uint entry, slot;
 
-  while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
-       if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
-         return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
+  while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
+       if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
+         return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
 
        if( entry = bt_splitpage (bt, set) )
          latch = bt->mgr->latchsets + entry;
@@ -2407,15 +2410,16 @@ uint entry;
        //      splice right page into split chain
        //      and WriteLock it.
 
+       bt_lockpage(bt, BtLockWrite, latch);
        latch->split = set->latch->split;
        set->latch->split = entry;
-       bt_lockpage(BtLockWrite, latch);
+       locks[src].slot = 0;
   }
 
   return bt->err = BTERR_atomic;
 }
 
-BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 uint idx, entry, slot;
@@ -2424,29 +2428,108 @@ BtKey *ptr;
 BtVal *val;
 
        if( slot = bt_atomicpage (bt, source, locks, src, set) )
+         ptr = keyptr(set->page, slot);
+       else
+         return bt->err = BTERR_struct;
+
+       if( !keycmp (ptr, key->key, key->len) )
+         if( !slotptr(set->page, slot)->dead )
                slotptr(set->page, slot)->dead = 1;
+         else
+               return 0;
        else
-               return bt->err = BTERR_struct;
+               return 0;
 
-       ptr = keyptr(set->page, slot);
        val = valptr(set->page, slot);
-
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
        set->latch->dirty = 1;
        set->page->act--;
+       bt->found++;
        return 0;
 }
 
-uint bt_parentmatch (AtomicKey *head, uint entry)
+//     delete an empty master page for a transaction
+
+//     note that the far right page never empties because
+//     it always contains (at least) the infinite stopper key
+//     and that all pages that don't contain any keys are
+//     deleted, or are being held under Atomic lock.
+
+BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
 {
-uint cnt = 0;
+BtPageSet right[1], temp[1];
+unsigned char value[BtId];
+uid right_page_no;
+BtKey *ptr;
+
+       bt_lockpage(bt, BtLockWrite, prev->latch);
+
+       //      grab the right sibling
+
+       if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+               right->page = bt_mappage (bt, right->latch);
+       else
+               return bt->err;
+
+       bt_lockpage(bt, BtLockAtomic, right->latch);
+       bt_lockpage(bt, BtLockWrite, right->latch);
+
+       //      and pull contents over empty page
+       //      while preserving master's left link
+
+       memcpy (right->page->left, prev->page->left, BtId);
+       memcpy (prev->page, right->page, bt->mgr->page_size);
+
+       //      forward seekers to old right sibling
+       //      to new page location in set
+
+       bt_putid (right->page->right, prev->latch->page_no);
+       right->latch->dirty = 1;
+       right->page->kill = 1;
+
+       //      remove pointer to right page for searchers by
+       //      changing right fence key to point to the master page
+
+       ptr = keyptr(right->page,right->page->cnt);
+       bt_putid (value, prev->latch->page_no);
+
+       if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+               return bt->err;
+
+       //  now that master page is in good shape we can
+       //      remove its locks.
+
+       bt_unlockpage (bt, BtLockAtomic, prev->latch);
+       bt_unlockpage (bt, BtLockWrite, prev->latch);
+
+       //  fix master's right sibling's left pointer
+       //      to remove scanner's poiner to the right page
 
-       if( head )
-         do if( head->entry == entry )
-               head->nounlock = 1, cnt++;
-         while( head = head->next );
+       if( right_page_no = bt_getid (prev->page->right) ) {
+         if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
+               temp->page = bt_mappage (bt, temp->latch);
 
-       return cnt;
+         bt_lockpage (bt, BtLockWrite, temp->latch);
+         bt_putid (temp->page->left, prev->latch->page_no);
+         temp->latch->dirty = 1;
+
+         bt_unlockpage (bt, BtLockWrite, temp->latch);
+         bt_unpinlatch (temp->latch);
+       } else {        // master is now the far right page
+         bt_spinwritelock (bt->mgr->lock);
+         bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+         bt_spinreleasewrite(bt->mgr->lock);
+       }
+
+       //      now that there are no pointers to the right page
+       //      we can delete it after the last read access occurs
+
+       bt_unlockpage (bt, BtLockWrite, right->latch);
+       bt_unlockpage (bt, BtLockAtomic, right->latch);
+       bt_lockpage (bt, BtLockDelete, right->latch);
+       bt_lockpage (bt, BtLockWrite, right->latch);
+       bt_freepage (bt, right);
+       return 0;
 }
 
 //     atomic modification of a batch of keys.
@@ -2464,7 +2547,7 @@ BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtKey *key, *ptr, *key2;
 BtLatchSet *latch;
-AtomicMod *locks;
+AtomicTxn *locks;
 int result = 0;
 BtSlot temp[1];
 BtPage page;
@@ -2472,7 +2555,7 @@ BtVal *val;
 uid right;
 int type;
 
-  locks = calloc (source->cnt + 1, sizeof(AtomicMod));
+  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
   head = NULL;
   tail = NULL;
 
@@ -2509,7 +2592,7 @@ int type;
          if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
          else // release read on previous page
-               bt_unlockpage(BtLockRead, set->latch); 
+               bt_unlockpage(bt, BtLockRead, set->latch); 
 
        if( !slot )
          if( slot = bt_atomicload(bt, set, key->key, key->len) )
@@ -2541,13 +2624,13 @@ int type;
 
                  // return constraint violation if key already exists
 
-                 bt_unlockpage(BtLockRead, set->latch);
+                 bt_unlockpage(bt, BtLockRead, set->latch);
                  result = src;
 
                  while( src ) {
                        if( locks[src].entry ) {
                          set->latch = bt->mgr->latchsets + locks[src].entry;
-                         bt_unlockpage(BtLockAtomic, set->latch);
+                         bt_unlockpage(bt, BtLockAtomic, set->latch);
                          bt_unpinlatch (set->latch);
                        }
                        src--;
@@ -2562,7 +2645,7 @@ int type;
   //  unlock last loadpage lock
 
   if( source->cnt > 1 )
-       bt_unlockpage(BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockRead, set->latch);
 
   //  obtain write lock for each master page
 
@@ -2570,7 +2653,7 @@ int type;
        if( locks[src].reuse )
          continue;
        else
-         bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
+         bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
 
   // insert or delete each key
   // process any splits or merges
@@ -2613,6 +2696,7 @@ int type;
        samepage = src;
 
        //  pick-up all splits from master page
+       //      each one is already WriteLocked.
 
        entry = prev->latch->split;
 
@@ -2621,14 +2705,15 @@ int type;
          set->page = bt_mappage (bt, set->latch);
          entry = set->latch->split;
 
-         // delete empty master page
+         // delete empty master page by undoing its split
+         //  (this is potentially another empty page)
+         //  note that there are no new left pointers yet
 
          if( !prev->page->act ) {
                memcpy (set->page->left, prev->page->left, BtId);
                memcpy (prev->page, set->page, bt->mgr->page_size);
-               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (bt, BtLockDelete, set->latch);
                bt_freepage (bt, set);
-               bt_unpinlatch (set->latch);
 
                prev->latch->dirty = 1;
                continue;
@@ -2639,9 +2724,8 @@ int type;
          if( !set->page->act ) {
                memcpy (prev->page->right, set->page->right, BtId);
                prev->latch->split = set->latch->split;
-               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (bt, BtLockDelete, set->latch);
                bt_freepage (bt, set);
-               bt_unpinlatch (set->latch);
                continue;
          }
 
@@ -2662,9 +2746,11 @@ int type;
 
          tail = leaf;
 
+         // splice in the left link into the split page
+
          bt_putid (set->page->left, prev->latch->page_no);
-         bt_lockpage(BtLockParent, prev->latch);
-         bt_unlockpage(BtLockWrite, prev->latch);
+         bt_lockpage(bt, BtLockParent, prev->latch);
+         bt_unlockpage(bt, BtLockWrite, prev->latch);
          *prev = *set;
        }
 
@@ -2672,19 +2758,19 @@ int type;
        //      (if all splits were reversed, latch->split == 0)
 
        if( latch->split ) {
-         //  fix left pointer in master's original (now split) right sibling
-         //    or set rightmost page in page zero
+         //  fix left pointer in master's original (now split)
+         //  far right sibling or set rightmost page in page zero
 
          if( right = bt_getid (prev->page->right) ) {
-               if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
+               if( set->latch = bt_pinlatch (bt, right, 1) )
                  set->page = bt_mappage (bt, set->latch);
                else
                  return -1;
 
-           bt_lockpage (BtLockWrite, set->latch);
+           bt_lockpage (bt, BtLockWrite, set->latch);
            bt_putid (set->page->left, prev->latch->page_no);
                set->latch->dirty = 1;
-           bt_unlockpage (BtLockWrite, set->latch);
+           bt_unlockpage (bt, BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
          } else {      // prev is rightmost page
            bt_spinwritelock (bt->mgr->lock);
@@ -2709,20 +2795,20 @@ int type;
 
          tail = leaf;
 
-         bt_lockpage(BtLockParent, prev->latch);
-         bt_unlockpage(BtLockWrite, prev->latch);
+         bt_lockpage(bt, BtLockParent, prev->latch);
+         bt_unlockpage(bt, BtLockWrite, prev->latch);
 
          //  remove atomic lock on master page
 
-         bt_unlockpage(BtLockAtomic, latch);
+         bt_unlockpage(bt, BtLockAtomic, latch);
          continue;
        }
 
        //  finished if prev page occupied (either master or final split)
 
        if( prev->page->act ) {
-         bt_unlockpage(BtLockWrite, latch);
-         bt_unlockpage(BtLockAtomic, latch);
+         bt_unlockpage(bt, BtLockWrite, latch);
+         bt_unlockpage(bt, BtLockAtomic, latch);
          bt_unpinlatch(latch);
          continue;
        }
@@ -2731,71 +2817,24 @@ int type;
        // master page located in prev is empty, delete it
        // by pulling over master's right sibling.
 
-       // Delete empty master's fence key
+       // Remove empty master's fence key
 
        ptr = keyptr(prev->page,prev->page->cnt);
-       leaf = calloc (sizeof(AtomicKey), 1);
 
-       memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-       leaf->page_no = prev->latch->page_no;
-       leaf->entry = prev->latch->entry;
-       leaf->type = 1;
-  
-       if( tail )
-         tail->next = leaf;
-       else
-         head = leaf;
-
-       tail = leaf;
-
-       bt_lockpage(BtLockParent, prev->latch);
-       bt_unlockpage(BtLockWrite, prev->latch);
-
-       // grab master's right sibling
-       //      note that the far right page never empties because
-       //      it always contains (at least) the infinite stopper key
-       //      and that all pages that don't contain any keys have
-       //      been deleted, or are being deleted under write lock.
-
-       if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
-               set->page = bt_mappage (bt, set->latch);
-       else
+       if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
                return -1;
 
-       bt_lockpage(BtLockWrite, set->latch);
-
-       //      and pull contents over empty page
-       //      while preserving master's left link
-
-       memcpy (set->page->left, prev->page->left, BtId);
-       memcpy (prev->page, set->page, bt->mgr->page_size);
-
-       //      forward seekers to old right sibling
-       //      to new page location in prev
-
-       bt_putid (set->page->right, prev->latch->page_no);
-       set->latch->dirty = 1;
-       set->page->kill = 1;
-
-       //      add new fence key for new master page contents, delete
-       //      right sibling after parent posts the new master fence.
+       //      perform the remainder of the delete
+       //      from the FIFO queue
 
-       ptr = keyptr(set->page,set->page->cnt);
        leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
-       leaf->entry = set->latch->entry;
+       leaf->entry = prev->latch->entry;
+       leaf->nounlock = 1;
        leaf->type = 2;
   
-       //  see if right sibling key update is in the FIFO,
-       //      and ParentLock if not there.
-
-       if( !bt_parentmatch (head, leaf->entry) )
-         bt_lockpage(BtLockParent, set->latch);
-
-       bt_unlockpage(BtLockWrite, set->latch);
-
        if( tail )
          tail->next = leaf;
        else
@@ -2803,25 +2842,10 @@ int type;
 
        tail = leaf;
 
-       //  fix new master's right sibling's left pointer
+       //      leave atomic lock in place until
+       //      deletion completes in next phase.
 
-       if( right = bt_getid (set->page->right) ) {
-         if( set->latch = bt_pinlatch (bt, right, 1) )
-               set->page = bt_mappage (bt, set->latch);
-
-         bt_lockpage (BtLockWrite, set->latch);
-         bt_putid (set->page->left, prev->latch->page_no);
-         set->latch->dirty = 1;
-
-         bt_unlockpage (BtLockWrite, set->latch);
-         bt_unpinlatch (set->latch);
-       } else {        // master is now the far right page
-         bt_spinwritelock (bt->mgr->lock);
-         bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
-         bt_spinreleasewrite(bt->mgr->lock);
-       }
-
-       bt_unlockpage(BtLockAtomic, latch);
+       bt_unlockpage(bt, BtLockWrite, prev->latch);
   }
   
   //  add & delete keys for any pages split or merged during transaction
@@ -2847,18 +2871,15 @@ int type;
 
                break;
 
-         case 2:       // insert key & free
-           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+         case 2:       // free page
+               if( bt_atomicfree (bt, set) )
                  return -1;
 
-               bt_lockpage (BtLockDelete, set->latch);
-               bt_lockpage (BtLockWrite, set->latch);
-               bt_freepage (bt, set);
                break;
          }
 
          if( !leaf->nounlock )
-           bt_unlockpage (BtLockParent, set->latch);
+           bt_unlockpage (bt, BtLockParent, set->latch);
 
          bt_unpinlatch (set->latch);
          tail = leaf->next;
@@ -2883,9 +2904,9 @@ BtPageSet set[1];
        else
                return 0;
 
-    bt_lockpage(BtLockRead, set->latch);
+    bt_lockpage(bt, BtLockRead, set->latch);
        memcpy (bt->cursor, set->page, bt->mgr->page_size);
-    bt_unlockpage(BtLockRead, set->latch);
+    bt_unlockpage(bt, BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
 
        bt->cursor_page = page_no;
@@ -2916,9 +2937,9 @@ findourself:
        else
                return 0;
 
-    bt_lockpage(BtLockRead, set->latch);
+    bt_lockpage(bt, BtLockRead, set->latch);
        memcpy (bt->cursor, set->page, bt->mgr->page_size);
-       bt_unlockpage(BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
        
        next = bt_getid (bt->cursor->right);
@@ -2964,11 +2985,11 @@ uid right;
        else
                return 0;
 
-    bt_lockpage(BtLockRead, set->latch);
+    bt_lockpage(bt, BtLockRead, set->latch);
 
        memcpy (bt->cursor, set->page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
        slot = 0;
 
@@ -2993,7 +3014,7 @@ uint slot;
 
        bt->cursor_page = set->latch->page_no;
 
-       bt_unlockpage(BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
        return slot;
 }
@@ -3277,7 +3298,7 @@ FILE *in;
                        }
                        else if( len < BT_maxkey )
                                key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
+               fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
                break;
 
        case 'w':
@@ -3322,7 +3343,7 @@ FILE *in;
                                set->page = bt_mappage (bt, set->latch);
                        else
                                fprintf(stderr, "unable to obtain latch"), exit(1);
-                       bt_lockpage (BtLockRead, set->latch);
+                       bt_lockpage (bt, BtLockRead, set->latch);
                        next = bt_getid (set->page->right);
 
                        for( slot = 0; slot++ < set->page->cnt; )
@@ -3341,7 +3362,7 @@ FILE *in;
                                cnt++;
                           }
 
-                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unlockpage (bt, BtLockRead, set->latch);
                        bt_unpinlatch (set->latch);
                } while( page_no = next );