]> pd.if.org Git - btree/blobdiff - fosterbtreeg.c
Make bt_splitpage more conservative wrt posting fence keys
[btree] / fosterbtreeg.c
index 7e78f6087baa7ed48272c91985b6b495b8b3d6db..e43f57677abd410f569741dd144f7f7f31b682b0 100644 (file)
@@ -1,5 +1,5 @@
 // foster btree version g
-// 23 JAN 2014
+// 29 JAN 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -100,7 +100,7 @@ typedef enum{
 //     the tod field from the key.
 
 //     Keys are marked dead, but remain on the page until
-//     it cleanup is called. The fence key (highest key) for
+//     cleanup is called. The fence key (highest key) for
 //     the page is always present, even after cleanup.
 
 typedef struct {
@@ -218,7 +218,6 @@ typedef struct {
        uint mode;                                      // read-write mode
 #ifdef unix
        int idx;
-       char *pooladvise;                       // bit maps for pool page advisements
 #else
        HANDLE idx;
 #endif
@@ -248,6 +247,7 @@ typedef struct {
        BtLatchSet *set;        // current page latch set
        BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
+       int foster;                     // last search was to foster child
        int found;                      // last delete was found
        int err;                        // last error
 } BtDb;
@@ -272,6 +272,11 @@ extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
+//     internal functions
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
 void bt_mgrclose (BtMgr *mgr);
@@ -722,7 +727,6 @@ uint slot;
        free (mgr->pool);
        free (mgr->hash);
        free (mgr->latch);
-       free (mgr->pooladvise);
        free (mgr);
 #else
        FlushFileBuffers(mgr->idx);
@@ -855,7 +859,6 @@ SYSTEM_INFO sysinfo[1];
        mgr->pool = calloc (poolmax, sizeof(BtPool));
        mgr->hash = calloc (hashsize, sizeof(ushort));
        mgr->latch = calloc (hashsize, sizeof(BtLatch));
-       mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
 #else
        mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
        mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
@@ -1104,8 +1107,6 @@ int flag;
        pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
-       // clear out madvise issued bits
-       memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
 #else
        flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
        pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
@@ -1128,17 +1129,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
-       {
-       uint idx = subpage / 8;
-       uint bit = subpage % 8;
-
-               if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
-                 madvise (page, bt->mgr->page_size, MADV_WILLNEED);
-                 (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
-               }
-       }
-#endif
        return page;
 }
 
@@ -1356,7 +1346,7 @@ int reuse;
 
        //      lock allocation page
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock, 1);
+       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
 
        // use empty chain first
        // else allocate empty page
@@ -1416,31 +1406,47 @@ int reuse;
 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
 {
 uint diff, higher = bt->page->cnt, low = 1, slot;
+uint good = 0;
+
+       //      if no right link
+       //        make stopper key an infinite fence value
+       //        by setting the good flag
+
+       if( bt_getid (bt->page->right) )
+               higher++;
+       else
+               good++;
 
-       //      low is the lowest candidate, higher is already
-       //      tested as .ge. the given key, loop ends when they meet
+       //      low is the next candidate.
+       //  loop ends when they meet
+
+       //  if good, higher is already
+       //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
                if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot;
+                       higher = slot, good++;
        }
 
-       return higher;
+       //      return zero if key is on right link page
+
+       return good ? higher : 0;
 }
 
 //  find and load page at given level for given key
 //     leave page rd or wr locked as requested
 
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
+uint bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
 {
 uid page_no = ROOT_page, prevpage = 0;
 BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
 uint mode, prevmode;
 BtPool *prevpool;
+int foster = 0;
 
   //  start at root of btree and drill down
 
@@ -1495,47 +1501,57 @@ BtPool *prevpool;
                }
          }
 
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
-
        //  find key on page at this level
        //  and either descend to requested level
        //      or return key slot
 
-       slot = bt_findslot (bt, key, len);
+       if( slot = bt_findslot (bt, key, len) ) {
+         //    is this slot < foster child area
+         //    on the requested level?
 
-       //      is this slot < foster child area
-       //      on the requested level?
+         //    if so, return actual slot even if dead
 
-       //      if so, return actual slot even if dead
+         if( slot <= bt->page->cnt - bt->page->foster )
+          if( drill == lvl )
+               return bt->foster = foster, slot;
 
-       if( slot <= bt->page->cnt - bt->page->foster )
-         if( drill == lvl )
-               return slot;
+         //    find next active slot
+         //    note: foster children are never dead
+
+         while( slotptr(bt->page, slot)->dead )
+          if( slot++ < bt->page->cnt )
+               continue;
+          else {
+               //  we are waiting for fence key posting
+               page_no = bt_getid(bt->page->right);
+               goto slideright;
+         }
 
-       //      find next active slot
+        //     is this slot < foster child area
+        //     if so, drill to next level
 
-       //      note: foster children are never dead
-       //      nor fence keys for interiour nodes
+        if( slot <= bt->page->cnt - bt->page->foster )
+               foster = 0, drill--;
+        else
+               foster = 1;
 
-       while( slotptr(bt->page, slot)->dead )
-         if( slot++ < bt->page->cnt )
-               continue;
-         else
-               return bt->err = BTERR_struct, 0;       // last key shouldn't be deleted
+         //  continue right onto foster child
+         //    or down to next level.
 
-       //      is this slot < foster child area
-       //      if so, drill to next level
+         page_no = bt_getid(slotptr(bt->page, slot)->id);
 
-       if( slot <= bt->page->cnt - bt->page->foster )
-               drill--;
+       //  or slide right into next page
 
-       //  continue right onto foster child
-       //      or down to next level.
+       } else {
+               page_no = bt_getid(bt->page->right);
+               foster = 1;
+       }
 
-       page_no = bt_getid(slotptr(bt->page, slot)->id);
+slideright:
+       prevpage = bt->page_no;
+       prevpool = bt->pool;
+       prevset = bt->set;
+       prevmode = mode;
 
   } while( page_no );
 
@@ -1545,183 +1561,387 @@ BtPool *prevpool;
   return 0;    // return error
 }
 
-//  find and delete key on page by marking delete flag bit
-//  when leaf page becomes empty, delete it from the btree
+//  remove empty page from the B-tree
+//     by pulling our right node left over ourselves
 
-BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+//  call with bt->page, etc, set to page's locked parent
+//     returns with page locked.
+
+BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
 {
-unsigned char leftkey[256];
-BtLatchSet *rset, *set;
-BtPool *pool, *rpool;
-BtPage rpage, page;
-uid page_no, right;
-uint slot, tod;
+BtLatchSet *rset, *pset, *rpset;
+BtPool *rpool, *ppool, *rppool;
+BtPage rpage, ppage, rppage;
+uid right, parent, rparent;
 BtKey ptr;
+uint idx;
 
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
+       //      cache node's parent page
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       // lock and map our right page
+       // note that it cannot be our foster child
+       // since the our node is empty
+       //      and it cannot be NULL because of the stopper
+       //      in the last right page
+
+       bt_lockpage (BtLockWrite, set);
+
+       // if we aren't dead yet
+
+       if( page->act )
+               goto rmergexit;
+
+       if( right = bt_getid (page->right) )
+         if( rpool = bt_pinpool (bt, right) )
+               rpage = bt_page (bt, rpool, right);
+         else
+               return bt->err;
        else
+               return bt->err = BTERR_struct;
+
+       rset = bt_pinlatch (bt, right);
+
+       //      find our right neighbor
+
+       if( ppage->act > 1 ) {
+        for( idx = slot; idx++ < ppage->cnt; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
+
+        if( idx > ppage->cnt )
+               return bt->err = BTERR_struct;
+
+        //  redirect right neighbor in parent to left node
+
+        bt_putid(slotptr(ppage,idx)->id, page_no);
+       }
+
+       //      if parent has only our deleted page, e.g. no right neighbor
+       //      prepare to merge parent itself
+
+       if( ppage->act == 1 ) {
+         if( rparent = bt_getid (ppage->right) )
+          if( rppool = bt_pinpool (bt, rparent) )
+               rppage = bt_page (bt, rppool, rparent);
+          else
                return bt->err;
+         else
+               return bt->err = BTERR_struct;
 
-       // if key is found delete it, otherwise ignore request
-       // note that fence keys of interiour nodes are not deleted.
+         rpset = bt_pinlatch (bt, rparent);
+         bt_lockpage (BtLockWrite, rpset);
 
-       if( bt->found = !keycmp (ptr, key, len) )
-               if( bt->found = slotptr(bt->page, slot)->dead == 0 ) {
-                       slotptr(bt->page,slot)->dead = 1;
-                       if( slot < bt->page->cnt )
-                               bt->page->dirty = 1;
-                       bt->page->act--;
+         // find our right neighbor on right parent page
+
+         for( idx = 0; idx++ < rppage->cnt; )
+               if( !slotptr(rppage, idx)->dead ) {
+                 bt_putid (slotptr(rppage, idx)->id, page_no);
+                 break;
                }
 
-       page_no = bt->page_no;
-       pool = bt->pool;
-       page = bt->page;
-       set = bt->set;
+         if( idx > rppage->cnt )
+               return bt->err = BTERR_struct;
+       }
 
-       // return if page is not empty or not found
+       //      now that there are no more pointers to our right node
+       //      we can wait for delete lock on it
 
-       if( page->act || !bt->found ) {
-               bt_unlockpage(BtLockWrite, set);
-               bt_unpinlatch (set);
-               bt_unpinpool (pool);
-               return bt->err;
+       bt_lockpage(BtLockDelete, rset);
+       bt_lockpage(BtLockWrite, rset);
+
+       // pull contents of right page into our empty page 
+
+       memcpy (page, rpage, bt->mgr->page_size);
+
+       // ready to release right parent lock
+       //      now that we have a new page in place
+
+       if( ppage->act == 1 ) {
+         bt_unlockpage (BtLockWrite, rpset);
+         bt_unpinlatch (rpset);
+         bt_unpinpool (rppool);
        }
 
-       // cache copy of fence key of empty node
+       //      add killed right block to free chain
+       //      lock latch mgr
 
-       ptr = keyptr(page, page->cnt);
-       memcpy(leftkey, ptr, ptr->len + 1);
+       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
 
-       //      release write lock on empty node
-       //      obtain Parent lock
+       //      store free chain in allocation page second right
 
-       bt_unlockpage(BtLockWrite, set);
-       bt_lockpage(BtLockParent, set);
+       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
 
-       //      load and lock parent to see
-       //  if delete of empty node is OK
-       //      ie, not a fence key of parent
+       // unlock latch mgr and right page
 
-       while( 1 ) {
-         if( slot = bt_loadpage (bt, leftkey+1, *leftkey, 1, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
-         else
+       bt_unlockpage(BtLockDelete, rset);
+       bt_unlockpage(BtLockWrite, rset);
+       bt_unpinlatch (rset);
+       bt_unpinpool (rpool);
+
+       bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
+
+       // delete our obsolete fence key from our parent
+
+       slotptr(ppage, slot)->dead = 1;
+       ppage->dirty = 1;
+
+       //      if our parent now empty
+       //      remove it from the tree
+
+       if( ppage->act-- == 1 )
+         if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
                return bt->err;
 
-         // does parent level contain our fence key yet?
-         // and is it free of foster children?
+rmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinlatch (pset);
+       bt_unpinpool (ppool);
 
-         if( !bt->page->foster )
-          if( !keycmp (ptr, leftkey+1, *leftkey) )
-               break;
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  remove empty page from the B-tree
+//     try merging left first.  If no left
+//     sibling, then merge right.
 
-         bt_unlockpage(BtLockWrite, bt->set);
-         bt_unpinlatch (bt->set);
-         bt_unpinpool (bt->pool);
+//     call with page loaded and locked,
+//  return with page locked.
+
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+{
+unsigned char fencekey[256], postkey[256];
+uint slot, idx, postfence = 0;
+BtLatchSet *lset, *pset;
+BtPool *lpool, *ppool;
+BtPage lpage, ppage;
+uid left, parent;
+BtKey ptr;
+
+       ptr = keyptr(page, page->cnt);
+       memcpy(fencekey, ptr, ptr->len + 1);
+       bt_unlockpage (BtLockWrite, set);
+
+       //      load and lock our parent
+
+retry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
+               return bt->err;
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       //      wait until we are not a foster child
+
+       if( bt->foster ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
 #ifdef unix
-         sched_yield();
+               sched_yield();
 #else
-         SwitchToThread();
+               SwitchToThread();
 #endif
+               goto retry;
        }
 
-       //      find our left fence key
+       //  find our left neighbor in our parent page
 
-       while( slotptr(bt->page, slot)->dead )
-         if( slot++ < bt->page->cnt )
-               continue;
-         else
-               return bt->err = BTERR_struct;  // last key shouldn't be deleted
+       for( idx = slot; --idx; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
 
-       //      now we have both parent and child
+       //      if no left neighbor, do right merge
 
-       bt_lockpage(BtLockDelete, set);
-       bt_lockpage(BtLockWrite, set);
+       if( !idx )
+               return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
 
-       // return if page has no right sibling within parent
-       //       or if empty node is no longer empty
+       // lock and map our left neighbor's page
 
-       if( page->act || slot == bt->page->cnt ) {
-               // unpin parent
-               bt_unlockpage(BtLockWrite, bt->set);
-               bt_unpinlatch (bt->set);
-               bt_unpinpool (bt->pool);
-               // unpin empty node
-               bt_unlockpage(BtLockParent, set);
-               bt_unlockpage(BtLockDelete, set);
-               bt_unlockpage(BtLockWrite, set);
-               bt_unpinlatch (set);
-               bt_unpinpool (pool);
+       left = bt_getid (slotptr(ppage, idx)->id);
+
+       if( lpool = bt_pinpool (bt, left) )
+               lpage = bt_page (bt, lpool, left);
+       else
                return bt->err;
+
+       lset = bt_pinlatch (bt, left);
+       bt_lockpage(BtLockWrite, lset);
+
+       //  wait until foster sibling is in our parent
+
+       if( bt_getid (lpage->right) != page_no ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
+               bt_unlockpage (BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
        }
 
-       // lock and map our right page
-       // note that it cannot be our foster child
-       // since the our node is empty
+       //  since our page will have no more pointers to it,
+       //      obtain Delete lock and wait for write locks to clear
 
-       right = bt_getid(page->right);
+       bt_lockpage(BtLockDelete, set);
+       bt_lockpage(BtLockWrite, set);
 
-       if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
-       else
-               return bt->err;
+       //      if we aren't dead yet,
+       //      get ready for exit
 
-       rset = bt_pinlatch (bt, right);
-       bt_lockpage(BtLockWrite, rset);
-       bt_lockpage(BtLockDelete, rset);
+       if( page->act ) {
+               bt_unlockpage(BtLockDelete, set);
+               bt_unlockpage(BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+               goto lmergexit;
+       }
 
-       // pull contents of right page into empty page 
+       //      are we are the fence key for our parent?
+       //      if so, grab our old fence key
 
-       memcpy (page, rpage, bt->mgr->page_size);
+       if( postfence = slot == ppage->cnt ) {
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(fencekey, ptr, ptr->len + 1);
+               memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
 
-       //      delete left parent slot for old empty page
-       //      and redirect right parent slot to it
+               // clear out other dead slots
 
-       bt->page->act--;
-       bt->page->dirty = 1;
-       slotptr(bt->page, slot)->dead = 1;
+               while( --ppage->cnt )
+                 if( slotptr(ppage, ppage->cnt)->dead )
+                       memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
+                 else
+                       break;
 
-       while( slot++ < bt->page->cnt )
-         if( !slotptr(bt->page, slot)->dead )
-               break;
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(postkey, ptr, ptr->len + 1);
+       } else
+               slotptr(ppage,slot)->dead = 1;
 
-       bt_putid(slotptr(bt->page,slot)->id, page_no);
+       ppage->dirty = 1;
+       ppage->act--;
 
-       // release parent level lock
-       //      and our empty node lock
+       //      push our right neighbor pointer to our left
 
-       bt_unlockpage(BtLockWrite, set);
-       bt_unlockpage(BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       memcpy (lpage->right, page->right, BtId);
 
-       //      add killed right block to free chain
+       //      add ourselves to free chain
        //      lock latch mgr
 
        bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
 
        //      store free chain in allocation page second right
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+       bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
 
-       // unlock latch mgr and right page
+       // unlock latch mgr and pages
 
        bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
+       bt_unlockpage(BtLockWrite, lset);
+       bt_unpinlatch (lset);
+       bt_unpinpool (lpool);
 
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unlockpage(BtLockDelete, rset);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       // release our node's delete lock
 
-       //      remove ParentModify lock
-
-       bt_unlockpage(BtLockParent, set);
        bt_unlockpage(BtLockDelete, set);
+
+lmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinpool (ppool);
+
+       //  do we need to post parent's fence key in its parent?
+
+       if( !postfence || parent == ROOT_page ) {
+               bt_unpinlatch (pset);
+               bt->found = 1;
+               return bt->err = 0;
+       }
+
+       //      interlock parent fence post
+
+       bt_lockpage (BtLockParent, pset);
+
+       //      load parent's parent page
+posttry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+               return bt->err;
+
+       if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
+         if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+               return bt->err;
+         else
+               goto posttry;
+
+       page = bt->page;
+
+       page->min -= *postkey + 1;
+       ((unsigned char *)page)[page->min] = *postkey;
+       memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
+       slotptr(page, slot)->off = page->min;
+
+       bt_unlockpage (BtLockParent, pset);
+       bt_unpinlatch (pset);
+
+       bt_unlockpage (BtLockWrite, bt->set);
+       bt_unpinlatch (bt->set);
+       bt_unpinpool (bt->pool);
+
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  find and delete key on page by marking delete flag bit
+//  if page becomes empty, delete it from the btree
+
+BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+{
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+BtKey ptr;
+uint slot;
+
+       if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+               return bt->err;
+
+       page_no = bt->page_no;
+       page = bt->page;
+       pool = bt->pool;
+       set = bt->set;
+
+       // if key is found delete it, otherwise ignore request
+
+       ptr = keyptr(page, slot);
+
+       if( bt->found = !keycmp (ptr, key, len) )
+         if( bt->found = slotptr(page, slot)->dead == 0 ) {
+               slotptr(page,slot)->dead = 1;
+                 if( slot < page->cnt )
+                       page->dirty = 1;
+                 if( !--page->act )
+                       if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
+                         return bt->err;
+               }
+
+       bt_unlockpage(BtLockWrite, set);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
-       return 0;
-} 
+       return bt->err = 0;
+}
 
 //     find key in leaf level and return row-id
 
@@ -1755,13 +1975,12 @@ uid id;
 //     0 - page needs splitting
 //     >0  new slot value
 
-uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
 uint nxt = bt->mgr->page_size;
-BtPage page = bt->page;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
-uint newslot;
+uint newslot = max;
 BtKey key;
 
        if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
@@ -1786,12 +2005,11 @@ BtKey key;
        // otherwise, remove deleted key
 
        // note: foster children are never dead
-       //      nor are fence keys for interiour nodes
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               else if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( cnt < max && slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy key
@@ -1822,9 +2040,8 @@ BtKey key;
 //     add key to current page
 //     page must already be writelocked
 
-void bt_addkeytopage (BtDb *bt, uint slot, unsigned char *key, uint len, uid id, uint tod)
+void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
 {
-BtPage page = bt->page;
 uint idx;
 
        // find next available dead slot and copy key onto page
@@ -1926,27 +2143,21 @@ BtKey key;
 }
 
 //  split already locked full node
-//     in current page variables
 //     return unlocked and unpinned.
 
-BTERR bt_splitpage (BtDb *bt)
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
 {
 uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
 unsigned char fencekey[256];
-uid page_no = bt->page_no;
-BtLatchSet *set = bt->set;
-BtPool *pool = bt->pool;
-BtPage page = bt->page;
 uint tod = time(NULL);
 uint lvl = page->lvl;
-uid new_page, right;
+uid new_page;
 BtKey key;
 
        //      initialize frame buffer for right node
 
        memset (bt->frame, 0, bt->mgr->page_size);
        max = page->cnt - page->foster;
-       tod = (uint)time(NULL);
        cnt = max / 2;
        idx = 0;
 
@@ -1967,10 +2178,8 @@ BtKey key;
 
        // transfer right link node to new right node
 
-       if( page_no > ROOT_page ) {
-               right = bt_getid (page->right);
-               bt_putid(bt->frame->right, right);
-       }
+       if( page_no > ROOT_page )
+               memcpy (bt->frame->right, page->right, BtId);
 
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
@@ -2097,10 +2306,18 @@ BtKey key;
        page->cnt--;
        page->act--;
 
+       bt_unlockpage (BtLockParent, set);
+
+       //  if this emptied page,
+       //      undo the foster child
+
+       if( !page->act )
+         if( bt_mergeleft (bt, page, pool, set, page_no, lvl) )
+               return bt->err;
+
        //      unlock and unpin
 
        bt_unlockpage (BtLockWrite, set);
-       bt_unlockpage (BtLockParent, set);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
        return 0;
@@ -2142,14 +2359,14 @@ BtKey ptr;
 
                // check if page has enough space
 
-               if( slot = bt_cleanpage (bt, len, slot) )
+               if( slot = bt_cleanpage (bt, bt->page, len, slot) )
                        break;
 
-               if( bt_splitpage (bt) )
+               if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
                        return bt->err;
        }
 
-       bt_addkeytopage (bt, slot, key, len, id, tod);
+       bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
 
        bt_unlockpage (BtLockWrite, bt->set);
        bt_unpinlatch (bt->set);
@@ -2323,6 +2540,7 @@ FILE *in;
                bt_latchaudit (bt);
                fprintf(stderr, "finished latch mgr audit\n");
                break;
+
        case 'w':
                fprintf(stderr, "started indexing for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )