]> pd.if.org Git - btree/commitdiff
rework db_deletekey to fix problems
authorunknown <karl@E04.petzent.com>
Thu, 30 Jan 2014 00:16:14 +0000 (16:16 -0800)
committerunknown <karl@E04.petzent.com>
Thu, 30 Jan 2014 00:16:14 +0000 (16:16 -0800)
threads2h.c
threads2i.c
threads2j.c

index 89e1894618ba1a1f03ce139a75dcf32af7fcf520..db5c99191b4a17de7e6dda84911ac14aea996ae2 100644 (file)
@@ -1,5 +1,5 @@
 // btree version threads2h pthread rw lock version
-// 24 JAN 2014
+// 29 JAN 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -125,7 +125,7 @@ typedef struct {
 typedef struct {
        BtLatch readwr[1];                      // read/write page lock
        BtLatch access[1];                      // Access Intent/Page delete
-       BtLatch parent[1];                      // adoption of foster children
+       BtLatch parent[1];                      // Posting of fence key in parent
        BtSpinLatch busy[1];            // slot is being moved between chains
        volatile ushort next;           // next entry in hash table chain
        volatile ushort prev;           // prev entry in hash table chain
@@ -174,11 +174,9 @@ typedef struct Page {
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
        unsigned char bits;                     // page size in bits
-       unsigned char lvl:6;            // level of page
-       unsigned char kill:1;           // page is being deleted
+       unsigned char lvl:7;            // level of page
        unsigned char dirty:1;          // page has deleted keys
        unsigned char right[BtId];      // page number to right
-       BtSlot table[0];                        // array of key slots
 } *BtPage;
 
 //     The memory mapping pool table buffer manager entry
@@ -192,7 +190,7 @@ typedef struct {
        void *hashprev;                         // previous pool entry for the same hash idx
        void *hashnext;                         // next pool entry for the same hash idx
 #ifndef unix
-       HANDLE hmap;
+       HANDLE hmap;                            // Windows memory mapping handle
 #endif
 } BtPool;
 
@@ -248,6 +246,7 @@ typedef struct {
        BtLatchSet *set;        // current page latch set
        BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
+       int parent;                     // last loadpage was from a parent level
        int found;                      // last delete or insert was found
        int err;                        // last error
 } BtDb;
@@ -265,12 +264,17 @@ typedef enum {
 // B-Tree functions
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr);
-extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
-extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
+extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
+extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
+//     internal functions
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
 void bt_mgrclose (BtMgr *mgr);
@@ -338,7 +342,7 @@ extern uint bt_tod (BtDb *bt, uint slot);
 
 //     Access macros to address slot and key values from the page
 
-#define slotptr(page, slot) (page->table + slot-1)
+#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
 
 void bt_putid(unsigned char *dest, uid id)
@@ -1508,6 +1512,7 @@ BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
 uint mode, prevmode;
 BtPool *prevpool;
+int parent = 1;
 
   //  start at root of btree and drill down
 
@@ -1567,30 +1572,33 @@ BtPool *prevpool;
        //  find key on page at this level
        //  and descend to requested level
 
-       if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) {
+       if( slot = bt_findslot (bt, key, len) ) {
          if( drill == lvl )
-               return slot;
+               return bt->parent = parent, slot;
 
          while( slotptr(bt->page, slot)->dead )
                if( slot++ < bt->page->cnt )
                        continue;
                else {
                        page_no = bt_getid(bt->page->right);
+                       parent = 0;
                        goto slideright;
                }
 
          page_no = bt_getid(slotptr(bt->page, slot)->id);
+         parent = 1;
          drill--;
        }
 
        //  or slide right into next page
-       //  (slide left from deleted page)
 
-       else
+       else {
                page_no = bt_getid(bt->page->right);
+               parent = 0;
+       }
 
        //  continue down / right using overlapping locks
-       //  to protect pages being killed or split.
+       //  to protect pages being split.
 
 slideright:
        prevpage = bt->page_no;
@@ -1605,119 +1613,384 @@ slideright:
   return 0;    // return error
 }
 
-//  find and delete key on page by marking delete flag bit
-//  when page becomes empty, delete it
+//  remove empty page from the B-tree
+//     by pulling our right node left over ourselves
+
+//  call with bt->page, etc, set to page's locked parent
+//     returns with page locked.
 
-BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
+BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
 {
-unsigned char lowerkey[256], higherkey[256];
-BtLatchSet *rset, *set;
-BtPool *pool, *rpool;
-uid page_no, right;
-uint slot, tod;
-BtPage rpage;
+BtLatchSet *rset, *pset, *rpset;
+BtPool *rpool, *ppool, *rppool;
+BtPage rpage, ppage, rppage;
+uid right, parent, rparent;
 BtKey ptr;
+uint idx;
 
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
+       //      cache node's parent page
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       // lock and map our right page
+       //      it cannot be NULL because of the stopper
+       //      in the last right page
+
+       bt_lockpage (BtLockWrite, set);
+
+       // if we aren't dead yet
+
+       if( page->act )
+               goto rmergexit;
+
+       if( right = bt_getid (page->right) )
+         if( rpool = bt_pinpool (bt, right) )
+               rpage = bt_page (bt, rpool, right);
+         else
+               return bt->err;
        else
+               return bt->err = BTERR_struct;
+
+       rset = bt_pinlatch (bt, right);
+
+       //      find our right neighbor
+
+       if( ppage->act > 1 ) {
+        for( idx = slot; idx++ < ppage->cnt; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
+
+        if( idx > ppage->cnt )
+               return bt->err = BTERR_struct;
+
+        //  redirect right neighbor in parent to left node
+
+        bt_putid(slotptr(ppage,idx)->id, page_no);
+       }
+
+       //      if parent has only our deleted page, e.g. no right neighbor
+       //      prepare to merge parent itself
+
+       if( ppage->act == 1 ) {
+         if( rparent = bt_getid (ppage->right) )
+          if( rppool = bt_pinpool (bt, rparent) )
+               rppage = bt_page (bt, rppool, rparent);
+          else
                return bt->err;
+         else
+               return bt->err = BTERR_struct;
 
-       // if key is found delete it, otherwise ignore request
+         rpset = bt_pinlatch (bt, rparent);
+         bt_lockpage (BtLockWrite, rpset);
 
-       if( bt->found = !keycmp (ptr, key, len) )
-               if( bt->found = slotptr(bt->page, slot)->dead == 0 ) {
-                       slotptr(bt->page,slot)->dead = 1;
-                       if( slot < bt->page->cnt )
-                               bt->page->dirty = 1;
-                       bt->page->act--;
+         // find our right neighbor on right parent page
+
+         for( idx = 0; idx++ < rppage->cnt; )
+               if( !slotptr(rppage, idx)->dead ) {
+                 bt_putid (slotptr(rppage, idx)->id, page_no);
+                 break;
                }
 
-       // return if page is not empty, or it has no right sibling
+         if( idx > rppage->cnt )
+               return bt->err = BTERR_struct;
+       }
 
-       right = bt_getid(bt->page->right);
-       page_no = bt->page_no;
-       pool = bt->pool;
-       set = bt->set;
+       //      now that there are no more pointers to our right node
+       //      we can wait for delete lock on it
 
-       if( !right || bt->page->act ) {
-               bt_unlockpage(BtLockWrite, set);
-               bt_unpinlatch (set);
-               bt_unpinpool (pool);
+       bt_lockpage(BtLockDelete, rset);
+       bt_lockpage(BtLockWrite, rset);
+
+       // pull contents of right page into our empty page 
+
+       memcpy (page, rpage, bt->mgr->page_size);
+
+       // ready to release right parent lock
+       //      now that we have a new page in place
+
+       if( ppage->act == 1 ) {
+         bt_unlockpage (BtLockWrite, rpset);
+         bt_unpinlatch (rpset);
+         bt_unpinpool (rppool);
+       }
+
+       //      add killed right block to free chain
+       //      lock latch mgr
+
+       bt_spinwritelock(bt->mgr->latchmgr->lock);
+
+       //      store free chain in allocation page second right
+
+       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+
+       // unlock latch mgr and right page
+
+       bt_unlockpage(BtLockDelete, rset);
+       bt_unlockpage(BtLockWrite, rset);
+       bt_unpinlatch (rset);
+       bt_unpinpool (rpool);
+
+       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+
+       // delete our obsolete fence key from our parent
+
+       slotptr(ppage, slot)->dead = 1;
+       ppage->dirty = 1;
+
+       //      if our parent now empty
+       //      remove it from the tree
+
+       if( ppage->act-- == 1 )
+         if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
+               return bt->err;
+
+rmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinlatch (pset);
+       bt_unpinpool (ppool);
+
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  remove empty page from the B-tree
+//     try merging left first.  If no left
+//     sibling, then merge right.
+
+//     call with page loaded and locked,
+//  return with page locked.
+
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+{
+unsigned char fencekey[256], postkey[256];
+uint slot, idx, postfence = 0;
+BtLatchSet *lset, *pset;
+BtPool *lpool, *ppool;
+BtPage lpage, ppage;
+uid left, parent;
+BtKey ptr;
+
+       ptr = keyptr(page, page->cnt);
+       memcpy(fencekey, ptr, ptr->len + 1);
+       bt_unlockpage (BtLockWrite, set);
+
+       //      load and lock our parent
+
+retry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
                return bt->err;
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       //      wait until we are posted in our parent
+
+       if( !bt->parent ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
        }
 
-       // obtain Parent lock over write lock
+       //  find our left neighbor in our parent page
 
-       bt_lockpage(BtLockParent, set);
+       for( idx = slot; --idx; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
 
-       // keep copy of key to delete
+       //      if no left neighbor, do right merge
 
-       ptr = keyptr(bt->page, bt->page->cnt);
-       memcpy(lowerkey, ptr, ptr->len + 1);
+       if( !idx )
+               return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
 
-       // lock and map right page
+       // lock and map our left neighbor's page
 
-       if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
+       left = bt_getid (slotptr(ppage, idx)->id);
+
+       if( lpool = bt_pinpool (bt, left) )
+               lpage = bt_page (bt, lpool, left);
        else
                return bt->err;
 
-       rset = bt_pinlatch (bt, right);
-       bt_lockpage(BtLockWrite, rset);
+       lset = bt_pinlatch (bt, left);
+       bt_lockpage(BtLockWrite, lset);
 
-       // pull contents of next page into current empty page 
+       //  wait until sibling is in our parent
 
-       memcpy (bt->page, rpage, bt->mgr->page_size);
+       if( bt_getid (lpage->right) != page_no ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
+               bt_unlockpage (BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
+       }
 
-       //      keep copy of key to update
+       //  since our page will have no more pointers to it,
+       //      obtain Delete lock and wait for write locks to clear
 
-       ptr = keyptr(rpage, rpage->cnt);
-       memcpy(higherkey, ptr, ptr->len + 1);
+       bt_lockpage(BtLockDelete, set);
+       bt_lockpage(BtLockWrite, set);
 
-       //  Mark right page as deleted and point it to left page
-       //      until we can post updates at higher level.
+       //      if we aren't dead yet,
+       //      get ready for exit
 
-       bt_putid(rpage->right, page_no);
-       rpage->kill = 1;
-       rpage->cnt = 0;
+       if( page->act ) {
+               bt_unlockpage(BtLockDelete, set);
+               bt_unlockpage(BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+               goto lmergexit;
+       }
 
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unlockpage(BtLockWrite, set);
+       //      are we are the fence key for our parent?
+       //      if so, grab our old fence key
 
-       //  delete old lower key to consolidated node
+       if( postfence = slot == ppage->cnt ) {
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(fencekey, ptr, ptr->len + 1);
+               memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
 
-       if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
-               return bt->err;
+               // clear out other dead slots
 
-       //  redirect higher key directly to consolidated node
+               while( --ppage->cnt )
+                 if( slotptr(ppage, ppage->cnt)->dead )
+                       memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
+                 else
+                       break;
 
-       tod = (uint)time(NULL);
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(postkey, ptr, ptr->len + 1);
+       } else
+               slotptr(ppage,slot)->dead = 1;
 
-       if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) )
-               return bt->err;
+       ppage->dirty = 1;
+       ppage->act--;
 
-       //      add killed right block to free chain
+       //      push our right neighbor pointer to our left
+
+       memcpy (lpage->right, page->right, BtId);
+
+       //      add ourselves to free chain
        //      lock latch mgr
 
        bt_spinwritelock(bt->mgr->latchmgr->lock);
 
        //      store free chain in allocation page second right
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+       bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
 
-       // unlock latch mgr and unpin right page
+       // unlock latch mgr and pages
 
        bt_spinreleasewrite(bt->mgr->latchmgr->lock);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       bt_unlockpage(BtLockWrite, lset);
+       bt_unpinlatch (lset);
+       bt_unpinpool (lpool);
+
+       // release our node's delete lock
+
+       bt_unlockpage(BtLockDelete, set);
+
+lmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinpool (ppool);
+
+       //  do we need to post parent's fence key in its parent?
 
-       //      remove ParentModify lock
+       if( !postfence || parent == ROOT_page ) {
+               bt_unpinlatch (pset);
+               bt->found = 1;
+               return bt->err = 0;
+       }
+
+       //      interlock parent fence post
+
+       bt_lockpage (BtLockParent, pset);
+
+       //      load parent's parent page
+posttry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+               return bt->err;
+
+       if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
+         if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+               return bt->err;
+         else
+               goto posttry;
+
+       page = bt->page;
+
+       page->min -= *postkey + 1;
+       ((unsigned char *)page)[page->min] = *postkey;
+       memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
+       slotptr(page, slot)->off = page->min;
+
+       bt_unlockpage (BtLockParent, pset);
+       bt_unpinlatch (pset);
+
+       bt_unlockpage (BtLockWrite, bt->set);
+       bt_unpinlatch (bt->set);
+       bt_unpinpool (bt->pool);
 
-       bt_unlockpage(BtLockParent, set);
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  find and delete key on page by marking delete flag bit
+//  if page becomes empty, delete it from the btree
+
+BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+{
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+BtKey ptr;
+uint slot;
+
+       if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+               return bt->err;
+
+       page_no = bt->page_no;
+       page = bt->page;
+       pool = bt->pool;
+       set = bt->set;
+
+       // if key is found delete it, otherwise ignore request
+
+       ptr = keyptr(page, slot);
+
+       if( bt->found = !keycmp (ptr, key, len) )
+         if( bt->found = slotptr(page, slot)->dead == 0 ) {
+               slotptr(page,slot)->dead = 1;
+                 if( slot < page->cnt )
+                       page->dirty = 1;
+                 if( !--page->act )
+                       if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
+                         return bt->err;
+               }
+
+       bt_unlockpage(BtLockWrite, set);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
-       return 0;
+       return bt->err = 0;
 }
 
 //     find key in leaf level and return row-id
@@ -1736,7 +2009,7 @@ uid id;
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( ptr->len == len && !memcmp (ptr->key, key, len) )
+       if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
                id = bt_getid(slotptr(bt->page,slot)->id);
        else
                id = 0;
@@ -1749,16 +2022,15 @@ uid id;
 
 //     check page for space available,
 //     clean if necessary and return
-//     =0 - page needs splitting
-//     >0 - go ahead at returned slot
+//     0 - page needs splitting
+//     >0  new slot value
 
-uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
 uint nxt = bt->mgr->page_size;
-BtPage page = bt->page;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
-uint newslot;
+uint newslot = max;
 BtKey key;
 
        if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
@@ -1777,15 +2049,19 @@ BtKey key;
        page->dirty = 0;
        page->act = 0;
 
-       // always leave fence key in list
+       // try cleaning up page first
+
+       // always leave fence key in the array
+       // otherwise, remove deleted key
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               else if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( cnt < max && slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy key
+
                key = keyptr(bt->frame, cnt);
                nxt -= key->len + 1;
                memcpy ((unsigned char *)page + nxt, key, key->len + 1);
@@ -1797,26 +2073,59 @@ BtKey key;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = nxt;
        }
+
        page->min = nxt;
        page->cnt = idx;
 
+       //      see if page has enough space now, or does it need splitting?
+
        if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
                return newslot;
 
        return 0;
 }
 
-// split the root and raise the height of the btree
+//     add key to current page
+//     page must already be writelocked
 
-BTERR bt_splitroot(BtDb *bt,  unsigned char *newkey, unsigned char *oldkey, uid page_no2)
+void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
+{
+uint idx;
+
+       // find next available dead slot and copy key onto page
+
+       for( idx = slot; idx < page->cnt; idx++ )
+         if( slotptr(page, idx)->dead )
+               break;
+
+       if( idx == page->cnt )
+               idx++, page->cnt++;
+
+       page->act++;
+
+       // now insert key into array before slot
+
+       while( idx > slot )
+               *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
+
+       page->min -= len + 1;
+       ((unsigned char *)page)[page->min] = len;
+       memcpy ((unsigned char *)page + page->min +1, key, len );
+
+       bt_putid(slotptr(page,slot)->id, id);
+       slotptr(page, slot)->off = page->min;
+       slotptr(page, slot)->tod = tod;
+       slotptr(page, slot)->dead = 0;
+}
+
+BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
 {
 uint nxt = bt->mgr->page_size;
 BtPage root = bt->page;
 uid new_page;
 
        //  Obtain an empty page to use, and copy the current
-       //  root contents into it which is the lower half of
-       //      the old root.
+       //  root contents into it
 
        if( !(new_page = bt_newpage(bt, root)) )
                return bt->err;
@@ -1828,16 +2137,17 @@ uid new_page;
 
        // insert first key on newroot page
 
-       nxt -= *newkey + 1;
-       memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1);
+       nxt -= *leftkey + 1;
+       memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
        bt_putid(slotptr(root, 1)->id, new_page);
        slotptr(root, 1)->off = nxt;
        
-       // insert second key on newroot page
+       // insert second key (stopper key) on newroot page
        // and increase the root height
 
-       nxt -= *oldkey + 1;
-       memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1);
+       nxt -= 3;
+       *((unsigned char *)root + nxt) = 2;
+       memset ((unsigned char *)root + nxt + 1, 0xff, 2);
        bt_putid(slotptr(root, 2)->id, page_no2);
        slotptr(root, 2)->off = nxt;
 
@@ -1856,31 +2166,26 @@ uid new_page;
 }
 
 //  split already locked full node
-//     return unlocked.
+//     return unlocked and unpinned.
 
-BTERR bt_splitpage (BtDb *bt)
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
 {
-uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
-unsigned char oldkey[256], lowerkey[256];
-uid page_no = bt->page_no, right;
-BtLatchSet *nset, *set = bt->set;
-BtPool *pool = bt->pool;
-BtPage page = bt->page;
+uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
+unsigned char rightkey[256], leftkey[256];
+uint tod = time(NULL);
 uint lvl = page->lvl;
 uid new_page;
 BtKey key;
-uint tod;
 
-       //  split higher half of keys to bt->frame
-       //      the last key (fence key) might be dead
-
-       tod = (uint)time(NULL);
+       //      initialize frame buffer for right node
 
        memset (bt->frame, 0, bt->mgr->page_size);
-       max = (int)page->cnt;
+       max = page->cnt;
        cnt = max / 2;
        idx = 0;
 
+       //  split higher half of keys to bt->frame
+
        while( cnt++ < max ) {
                key = keyptr(page, cnt);
                nxt -= key->len + 1;
@@ -1892,166 +2197,142 @@ uint tod;
                slotptr(bt->frame, idx)->off = nxt;
        }
 
-       // remember existing fence key for new page to the right
+       // transfer right link node to new right node
 
-       memcpy (oldkey, key, key->len + 1);
+       if( page_no > ROOT_page )
+               memcpy (bt->frame->right, page->right, BtId);
 
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
        bt->frame->cnt = idx;
        bt->frame->lvl = lvl;
 
-       // link right node
-
-       if( page_no > ROOT_page ) {
-               right = bt_getid (page->right);
-               bt_putid(bt->frame->right, right);
-       }
-
-       //      get new free page and write frame to it.
+       //      get new free page and write right frame to it.
 
        if( !(new_page = bt_newpage(bt, bt->frame)) )
                return bt->err;
 
+       //      remember fence key for new right page to add
+       //      as right sibling to the left node
+
+       key = keyptr(bt->frame, idx);
+       memcpy (rightkey, key, key->len + 1);
+
        //      update lower keys to continue in old page
 
        memcpy (bt->frame, page, bt->mgr->page_size);
        memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
        nxt = bt->mgr->page_size;
+       page->dirty = 0;
        page->act = 0;
        cnt = 0;
        idx = 0;
 
        //  assemble page of smaller keys
-       //      (they're all active keys)
+       //      to remain in the old page
 
        while( cnt++ < max / 2 ) {
                key = keyptr(bt->frame, cnt);
                nxt -= key->len + 1;
                memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
+                       page->act++;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = nxt;
-               page->act++;
        }
 
-       // remember fence key for old page
+       // finalize left page and save fence key
 
-       memcpy(lowerkey, key, key->len + 1);
-       bt_putid(page->right, new_page);
+       memcpy(leftkey, key, key->len + 1);
        page->min = nxt;
        page->cnt = idx;
 
+       //      link new right page
+
+       bt_putid (page->right, new_page);
+
        // if current page is the root page, split it
 
        if( page_no == ROOT_page )
-               return bt_splitroot (bt, lowerkey, oldkey, new_page);
+               return bt_splitroot (bt, leftkey, new_page);
 
-       // obtain Parent/Write locks
-       // for left and right node pages
+       //  obtain ParentModification lock for current page
 
-       nset = bt_pinlatch (bt, new_page);
-
-       bt_lockpage (BtLockParent, nset);
        bt_lockpage (BtLockParent, set);
 
-       //  release wr lock on left page
-       //  (keep the SMO in sequence)
+       //  release wr lock on our page.
+       //      this will keep out another SMO
 
        bt_unlockpage (BtLockWrite, set);
 
-       // insert new fence for reformulated left block
+       //  insert key for old page (lower keys)
 
-       if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) )
+       if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) )
                return bt->err;
 
-       // fix old fence for newly allocated right block page
+       //      switch old parent key from us to our right page
 
-       if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) )
+       if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) )
                return bt->err;
 
-       // release Parent locks
+       //      unlock and unpin
 
-       bt_unlockpage (BtLockParent, nset);
        bt_unlockpage (BtLockParent, set);
-       bt_unpinlatch (nset);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
        return 0;
 }
 
-//  Insert new key into the btree at requested level.
-//  Level zero pages are leaf pages. Page is unlocked at exit.
+//  Insert new key into the btree at given level.
 
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
 {
 uint slot, idx;
 BtPage page;
 BtKey ptr;
 
-  while( 1 ) {
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
-       else
-       {
-               if ( !bt->err )
-                       bt->err = BTERR_ovflw;
-               return bt->err;
-       }
-
-       // if key already exists, update id and return
-
-       page = bt->page;
-
-       if( bt->found = !keycmp (ptr, key, len) ) {
-               slotptr(page, slot)->dead = 0;
-               slotptr(page, slot)->tod = tod;
-               bt_putid(slotptr(page,slot)->id, id);
-               bt_unlockpage(BtLockWrite, bt->set);
-               bt_unpinlatch(bt->set);
-               bt_unpinpool (bt->pool);
-               return bt->err;
-       }
-
-       // check if page has enough space
-
-       if( slot = bt_cleanpage (bt, len, slot) )
-               break;
-
-       if( bt_splitpage (bt) )
-               return bt->err;
-  }
-
-  // calculate next available slot and copy key into page
+       while( 1 ) {
+               if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
+                       ptr = keyptr(bt->page, slot);
+               else
+               {
+                       if ( !bt->err )
+                               bt->err = BTERR_ovflw;
+                       return bt->err;
+               }
 
-  page->min -= len + 1; // reset lowest used offset
-  ((unsigned char *)page)[page->min] = len;
-  memcpy ((unsigned char *)page + page->min +1, key, len );
+               // if key already exists, update id and return
 
-  for( idx = slot; idx < page->cnt; idx++ )
-       if( slotptr(page, idx)->dead )
-               break;
+               page = bt->page;
 
-  // now insert key into array before slot
-  // preserving the fence slot
+               if( !keycmp (ptr, key, len) ) {
+                       if( slotptr(page, slot)->dead )
+                               page->act++;
+                       slotptr(page, slot)->dead = 0;
+                       slotptr(page, slot)->tod = tod;
+                       bt_putid(slotptr(page,slot)->id, id);
+                       bt_unlockpage(BtLockWrite, bt->set);
+                       bt_unpinlatch (bt->set);
+                       bt_unpinpool (bt->pool);
+                       return bt->err;
+               }
 
-  if( idx == page->cnt )
-       idx++, page->cnt++;
+               // check if page has enough space
 
-  page->act++;
+               if( slot = bt_cleanpage (bt, bt->page, len, slot) )
+                       break;
 
-  while( idx > slot )
-       *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
+               if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+                       return bt->err;
+       }
 
-  bt_putid(slotptr(page,slot)->id, id);
-  slotptr(page, slot)->off = page->min;
-  slotptr(page, slot)->tod = tod;
-  slotptr(page, slot)->dead = 0;
+       bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
 
-  bt_unlockpage (BtLockWrite, bt->set);
-  bt_unpinlatch (bt->set);
-  bt_unpinpool (bt->pool);
-  return 0;
+       bt_unlockpage (BtLockWrite, bt->set);
+       bt_unpinlatch (bt->set);
+       bt_unpinpool (bt->pool);
+       return 0;
 }
 
 //  cache page of keys into cursor and return starting slot for given key
@@ -2063,7 +2344,9 @@ uint slot;
        // cache page for retrieval
        if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
                memcpy (bt->cursor, bt->page, bt->mgr->page_size);
+
        bt->cursor_page = bt->page_no;
+
        bt_unlockpage(BtLockRead, bt->set);
        bt_unpinlatch (bt->set);
        bt_unpinpool (bt->pool);
@@ -2075,6 +2358,7 @@ uint slot;
 
 uint bt_nextkey (BtDb *bt, uint slot)
 {
+BtLatchSet *set;
 BtPool *pool;
 BtPage page;
 uid right;
@@ -2084,7 +2368,7 @@ uid right;
        while( slot++ < bt->cursor->cnt )
          if( slotptr(bt->cursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt))
+         else if( right || (slot < bt->cursor->cnt) )
                return slot;
          else
                break;
@@ -2093,19 +2377,18 @@ uid right;
                break;
 
        bt->cursor_page = right;
-
        if( pool = bt_pinpool (bt, right) )
                page = bt_page (bt, pool, right);
        else
                return 0;
 
-       bt->set = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, bt->set);
+       set = bt_pinlatch (bt, right);
+    bt_lockpage(BtLockRead, set);
 
        memcpy (bt->cursor, page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
+       bt_unlockpage(BtLockRead, set);
+       bt_unpinlatch (set);
        bt_unpinpool (pool);
        slot = 0;
   } while( 1 );
@@ -2128,55 +2411,8 @@ uint bt_tod(BtDb *bt, uint slot)
        return slotptr(bt->cursor,slot)->tod;
 }
 
-#ifdef STANDALONE
-
-void bt_latchaudit (BtDb *bt)
-{
-ushort idx, hashidx;
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
-uid page_no;
-
-#ifdef unix
-       for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
-               set = bt->mgr->latchsets + idx;
-               if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
-                       fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
-                       *(ushort *)set->readwr = 0;
-                       *(ushort *)set->access = 0;
-                       *(ushort *)set->parent = 0;
-               }
-               if( set->pin ) {
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-                       set->pin = 0;
-               }
-       }
-
-       for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
-         if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
-               fprintf(stderr, "latchmgr locked\n");
-         if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
-               set = bt->mgr->latchsets + idx;
-               if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
-                       fprintf(stderr, "latchset %d locked\n", idx);
-               if( set->hash != hashidx )
-                       fprintf(stderr, "latchset %d wrong hashidx\n", idx);
-               if( set->pin )
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-         } while( idx = set->next );
-       }
-       page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
 
-       while( page_no ) {
-               fprintf(stderr, "free: %.6x\n", (uint)page_no);
-               pool = bt_pinpool (bt, page_no);
-               page = bt_page (bt, pool, page_no);
-           page_no = bt_getid(page->right);
-               bt_unpinpool (pool);
-       }
-#endif
-}
+#ifdef STANDALONE
 
 typedef struct {
        char type, idx;
@@ -2199,6 +2435,7 @@ uid next, page_no = LEAF_page;    // start on first page of leaves
 unsigned char key[256];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
+BtLatchSet *set;
 time_t tod[1];
 BtPool *pool;
 BtPage page;
@@ -2211,12 +2448,6 @@ FILE *in;
 
        switch(args->type | 0x20)
        {
-       case 'a':
-               fprintf(stderr, "started latch mgr audit\n");
-               bt_latchaudit (bt);
-               fprintf(stderr, "finished latch mgr audit\n");
-               break;
-
        case 'w':
                fprintf(stderr, "started indexing for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )
@@ -2231,7 +2462,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_insertkey (bt, key, len, 0, line, *tod) )
+                         if( bt_insertkey (bt, key, len, line, *tod, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2253,7 +2484,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_deletekey (bt, key, len, 0) )
+                         if( bt_deletekey (bt, key, len) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2308,17 +2539,17 @@ FILE *in;
                fprintf(stderr, "started reading\n");
 
                do {
-                       if( bt->pool = bt_pinpool (bt, page_no) )
-                               page = bt_page (bt, bt->pool, page_no);
+                       if( pool = bt_pinpool (bt, page_no) )
+                               page = bt_page (bt, pool, page_no);
                        else
                                break;
-                       bt->set = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, bt->set);
+                       set = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set);
                        cnt += page->act;
                        next = bt_getid (page->right);
-                       bt_unlockpage (BtLockRead, bt->set);
-                       bt_unpinlatch (bt->set);
-                       bt_unpinpool (bt->pool);
+                       bt_unlockpage (BtLockRead, set);
+                       bt_unpinlatch (set);
+                       bt_unpinpool (pool);
                } while( page_no = next );
 
                cnt--;  // remove stopper key
index 25feba3b5ef345f7b97ff78cdf177cbe4835d56e..e53e31d8443209da97ce568dc3bc78922ee1e3ca 100644 (file)
@@ -1,5 +1,5 @@
 // btree version threads2i sched_yield version
-// 24 JAN 2014
+// 29 JAN 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -162,16 +162,13 @@ typedef struct {
 //     by the BtSlot array of keys.
 
 typedef struct Page {
-       BtLatchSet latch[1];            // Set of three latches
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
        unsigned char bits;                     // page size in bits
-       unsigned char lvl:6;            // level of page
-       unsigned char kill:1;           // page is being deleted
+       unsigned char lvl:7;            // level of page
        unsigned char dirty:1;          // page has deleted keys
        unsigned char right[BtId];      // page number to right
-       BtSlot table[0];                        // array of key slots
 } *BtPage;
 
 //     The memory mapping pool table buffer manager entry
@@ -185,7 +182,7 @@ typedef struct {
        void *hashprev;                         // previous pool entry for the same hash idx
        void *hashnext;                         // next pool entry for the same hash idx
 #ifndef unix
-       HANDLE hmap;
+       HANDLE hmap;                            // Windows memory mapping handle
 #endif
 } BtPool;
 
@@ -241,6 +238,7 @@ typedef struct {
        BtLatchSet *set;        // current page latch set
        BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
+       int parent;                     // last loadpage was from a parent level
        int found;                      // last delete or insert was found
        int err;                        // last error
 } BtDb;
@@ -258,12 +256,17 @@ typedef enum {
 // B-Tree functions
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr);
-extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
-extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
+extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
+extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
+//     internal functions
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
 void bt_mgrclose (BtMgr *mgr);
@@ -331,7 +334,7 @@ extern uint bt_tod (BtDb *bt, uint slot);
 
 //     Access macros to address slot and key values from the page
 
-#define slotptr(page, slot) (page->table + slot-1)
+#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
 
 void bt_putid(unsigned char *dest, uid id)
@@ -1414,6 +1417,7 @@ BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
 uint mode, prevmode;
 BtPool *prevpool;
+int parent = 1;
 
   //  start at root of btree and drill down
 
@@ -1473,30 +1477,33 @@ BtPool *prevpool;
        //  find key on page at this level
        //  and descend to requested level
 
-       if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) {
+       if( slot = bt_findslot (bt, key, len) ) {
          if( drill == lvl )
-               return slot;
+               return bt->parent = parent, slot;
 
          while( slotptr(bt->page, slot)->dead )
                if( slot++ < bt->page->cnt )
                        continue;
                else {
                        page_no = bt_getid(bt->page->right);
+                       parent = 0;
                        goto slideright;
                }
 
          page_no = bt_getid(slotptr(bt->page, slot)->id);
+         parent = 1;
          drill--;
        }
 
        //  or slide right into next page
-       //  (slide left from deleted page)
 
-       else
+       else {
                page_no = bt_getid(bt->page->right);
+               parent = 0;
+       }
 
        //  continue down / right using overlapping locks
-       //  to protect pages being killed or split.
+       //  to protect pages being split.
 
 slideright:
        prevpage = bt->page_no;
@@ -1511,119 +1518,384 @@ slideright:
   return 0;    // return error
 }
 
-//  find and delete key on page by marking delete flag bit
-//  when page becomes empty, delete it
+//  remove empty page from the B-tree
+//     by pulling our right node left over ourselves
+
+//  call with bt->page, etc, set to page's locked parent
+//     returns with page locked.
 
-BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
+BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
 {
-unsigned char lowerkey[256], higherkey[256];
-BtLatchSet *rset, *set;
-BtPool *pool, *rpool;
-uid page_no, right;
-uint slot, tod;
-BtPage rpage;
+BtLatchSet *rset, *pset, *rpset;
+BtPool *rpool, *ppool, *rppool;
+BtPage rpage, ppage, rppage;
+uid right, parent, rparent;
 BtKey ptr;
+uint idx;
 
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
+       //      cache node's parent page
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       // lock and map our right page
+       //      it cannot be NULL because of the stopper
+       //      in the last right page
+
+       bt_lockpage (BtLockWrite, set);
+
+       // if we aren't dead yet
+
+       if( page->act )
+               goto rmergexit;
+
+       if( right = bt_getid (page->right) )
+         if( rpool = bt_pinpool (bt, right) )
+               rpage = bt_page (bt, rpool, right);
+         else
+               return bt->err;
        else
+               return bt->err = BTERR_struct;
+
+       rset = bt_pinlatch (bt, right);
+
+       //      find our right neighbor
+
+       if( ppage->act > 1 ) {
+        for( idx = slot; idx++ < ppage->cnt; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
+
+        if( idx > ppage->cnt )
+               return bt->err = BTERR_struct;
+
+        //  redirect right neighbor in parent to left node
+
+        bt_putid(slotptr(ppage,idx)->id, page_no);
+       }
+
+       //      if parent has only our deleted page, e.g. no right neighbor
+       //      prepare to merge parent itself
+
+       if( ppage->act == 1 ) {
+         if( rparent = bt_getid (ppage->right) )
+          if( rppool = bt_pinpool (bt, rparent) )
+               rppage = bt_page (bt, rppool, rparent);
+          else
                return bt->err;
+         else
+               return bt->err = BTERR_struct;
 
-       // if key is found delete it, otherwise ignore request
+         rpset = bt_pinlatch (bt, rparent);
+         bt_lockpage (BtLockWrite, rpset);
 
-       if( bt->found = !keycmp (ptr, key, len) )
-               if( bt->found = slotptr(bt->page, slot)->dead == 0 ) {
-                       slotptr(bt->page,slot)->dead = 1;
-                       if( slot < bt->page->cnt )
-                               bt->page->dirty = 1;
-                       bt->page->act--;
+         // find our right neighbor on right parent page
+
+         for( idx = 0; idx++ < rppage->cnt; )
+               if( !slotptr(rppage, idx)->dead ) {
+                 bt_putid (slotptr(rppage, idx)->id, page_no);
+                 break;
                }
 
-       // return if page is not empty, or it has no right sibling
+         if( idx > rppage->cnt )
+               return bt->err = BTERR_struct;
+       }
 
-       right = bt_getid(bt->page->right);
-       page_no = bt->page_no;
-       pool = bt->pool;
-       set = bt->set;
+       //      now that there are no more pointers to our right node
+       //      we can wait for delete lock on it
 
-       if( !right || bt->page->act ) {
-               bt_unlockpage(BtLockWrite, set);
-               bt_unpinlatch (set);
-               bt_unpinpool (pool);
+       bt_lockpage(BtLockDelete, rset);
+       bt_lockpage(BtLockWrite, rset);
+
+       // pull contents of right page into our empty page 
+
+       memcpy (page, rpage, bt->mgr->page_size);
+
+       // ready to release right parent lock
+       //      now that we have a new page in place
+
+       if( ppage->act == 1 ) {
+         bt_unlockpage (BtLockWrite, rpset);
+         bt_unpinlatch (rpset);
+         bt_unpinpool (rppool);
+       }
+
+       //      add killed right block to free chain
+       //      lock latch mgr
+
+       bt_spinwritelock(bt->mgr->latchmgr->lock);
+
+       //      store free chain in allocation page second right
+
+       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+
+       // unlock latch mgr and right page
+
+       bt_unlockpage(BtLockDelete, rset);
+       bt_unlockpage(BtLockWrite, rset);
+       bt_unpinlatch (rset);
+       bt_unpinpool (rpool);
+
+       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+
+       // delete our obsolete fence key from our parent
+
+       slotptr(ppage, slot)->dead = 1;
+       ppage->dirty = 1;
+
+       //      if our parent now empty
+       //      remove it from the tree
+
+       if( ppage->act-- == 1 )
+         if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
+               return bt->err;
+
+rmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinlatch (pset);
+       bt_unpinpool (ppool);
+
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  remove empty page from the B-tree
+//     try merging left first.  If no left
+//     sibling, then merge right.
+
+//     call with page loaded and locked,
+//  return with page locked.
+
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+{
+unsigned char fencekey[256], postkey[256];
+uint slot, idx, postfence = 0;
+BtLatchSet *lset, *pset;
+BtPool *lpool, *ppool;
+BtPage lpage, ppage;
+uid left, parent;
+BtKey ptr;
+
+       ptr = keyptr(page, page->cnt);
+       memcpy(fencekey, ptr, ptr->len + 1);
+       bt_unlockpage (BtLockWrite, set);
+
+       //      load and lock our parent
+
+retry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
                return bt->err;
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       //      wait until we are posted in our parent
+
+       if( !bt->parent ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
        }
 
-       // obtain Parent lock over write lock
+       //  find our left neighbor in our parent page
 
-       bt_lockpage(BtLockParent, set);
+       for( idx = slot; --idx; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
 
-       // keep copy of key to delete
+       //      if no left neighbor, do right merge
 
-       ptr = keyptr(bt->page, bt->page->cnt);
-       memcpy(lowerkey, ptr, ptr->len + 1);
+       if( !idx )
+               return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
 
-       // lock and map right page
+       // lock and map our left neighbor's page
 
-       if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
+       left = bt_getid (slotptr(ppage, idx)->id);
+
+       if( lpool = bt_pinpool (bt, left) )
+               lpage = bt_page (bt, lpool, left);
        else
                return bt->err;
 
-       rset = bt_pinlatch (bt, right);
-       bt_lockpage(BtLockWrite, rset);
+       lset = bt_pinlatch (bt, left);
+       bt_lockpage(BtLockWrite, lset);
 
-       // pull contents of next page into current empty page 
+       //  wait until sibling is in our parent
+
+       if( bt_getid (lpage->right) != page_no ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
+               bt_unlockpage (BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
+       }
 
-       memcpy (bt->page, rpage, bt->mgr->page_size);
+       //  since our page will have no more pointers to it,
+       //      obtain Delete lock and wait for write locks to clear
 
-       //      keep copy of key to update
+       bt_lockpage(BtLockDelete, set);
+       bt_lockpage(BtLockWrite, set);
 
-       ptr = keyptr(rpage, rpage->cnt);
-       memcpy(higherkey, ptr, ptr->len + 1);
+       //      if we aren't dead yet,
+       //      get ready for exit
 
-       //  Mark right page as deleted and point it to left page
-       //      until we can post updates at higher level.
+       if( page->act ) {
+               bt_unlockpage(BtLockDelete, set);
+               bt_unlockpage(BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+               goto lmergexit;
+       }
 
-       bt_putid(rpage->right, page_no);
-       rpage->kill = 1;
-       rpage->cnt = 0;
+       //      are we are the fence key for our parent?
+       //      if so, grab our old fence key
 
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unlockpage(BtLockWrite, set);
+       if( postfence = slot == ppage->cnt ) {
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(fencekey, ptr, ptr->len + 1);
+               memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
 
-       //  delete old lower key to consolidated node
+               // clear out other dead slots
 
-       if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
-               return bt->err;
+               while( --ppage->cnt )
+                 if( slotptr(ppage, ppage->cnt)->dead )
+                       memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
+                 else
+                       break;
 
-       //  redirect higher key directly to consolidated node
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(postkey, ptr, ptr->len + 1);
+       } else
+               slotptr(ppage,slot)->dead = 1;
 
-       tod = (uint)time(NULL);
+       ppage->dirty = 1;
+       ppage->act--;
 
-       if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) )
-               return bt->err;
+       //      push our right neighbor pointer to our left
 
-       //      add killed right block to free chain
+       memcpy (lpage->right, page->right, BtId);
+
+       //      add ourselves to free chain
        //      lock latch mgr
 
        bt_spinwritelock(bt->mgr->latchmgr->lock);
 
        //      store free chain in allocation page second right
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+       bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
 
-       // unlock latch mgr and unpin right page
+       // unlock latch mgr and pages
 
        bt_spinreleasewrite(bt->mgr->latchmgr->lock);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       bt_unlockpage(BtLockWrite, lset);
+       bt_unpinlatch (lset);
+       bt_unpinpool (lpool);
+
+       // release our node's delete lock
+
+       bt_unlockpage(BtLockDelete, set);
 
-       //      remove ParentModify lock
+lmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinpool (ppool);
+
+       //  do we need to post parent's fence key in its parent?
+
+       if( !postfence || parent == ROOT_page ) {
+               bt_unpinlatch (pset);
+               bt->found = 1;
+               return bt->err = 0;
+       }
+
+       //      interlock parent fence post
+
+       bt_lockpage (BtLockParent, pset);
+
+       //      load parent's parent page
+posttry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+               return bt->err;
 
-       bt_unlockpage(BtLockParent, set);
+       if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
+         if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+               return bt->err;
+         else
+               goto posttry;
+
+       page = bt->page;
+
+       page->min -= *postkey + 1;
+       ((unsigned char *)page)[page->min] = *postkey;
+       memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
+       slotptr(page, slot)->off = page->min;
+
+       bt_unlockpage (BtLockParent, pset);
+       bt_unpinlatch (pset);
+
+       bt_unlockpage (BtLockWrite, bt->set);
+       bt_unpinlatch (bt->set);
+       bt_unpinpool (bt->pool);
+
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  find and delete key on page by marking delete flag bit
+//  if page becomes empty, delete it from the btree
+
+BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+{
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+BtKey ptr;
+uint slot;
+
+       if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+               return bt->err;
+
+       page_no = bt->page_no;
+       page = bt->page;
+       pool = bt->pool;
+       set = bt->set;
+
+       // if key is found delete it, otherwise ignore request
+
+       ptr = keyptr(page, slot);
+
+       if( bt->found = !keycmp (ptr, key, len) )
+         if( bt->found = slotptr(page, slot)->dead == 0 ) {
+               slotptr(page,slot)->dead = 1;
+                 if( slot < page->cnt )
+                       page->dirty = 1;
+                 if( !--page->act )
+                       if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
+                         return bt->err;
+               }
+
+       bt_unlockpage(BtLockWrite, set);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
-       return 0;
+       return bt->err = 0;
 }
 
 //     find key in leaf level and return row-id
@@ -1642,7 +1914,7 @@ uid id;
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( ptr->len == len && !memcmp (ptr->key, key, len) )
+       if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
                id = bt_getid(slotptr(bt->page,slot)->id);
        else
                id = 0;
@@ -1655,16 +1927,15 @@ uid id;
 
 //     check page for space available,
 //     clean if necessary and return
-//     =0 - page needs splitting
-//     >0 - go ahead at returned slot
+//     0 - page needs splitting
+//     >0  new slot value
 
-uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
 uint nxt = bt->mgr->page_size;
-BtPage page = bt->page;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
-uint newslot;
+uint newslot = max;
 BtKey key;
 
        if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
@@ -1683,15 +1954,19 @@ BtKey key;
        page->dirty = 0;
        page->act = 0;
 
-       // always leave fence key in list
+       // try cleaning up page first
+
+       // always leave fence key in the array
+       // otherwise, remove deleted key
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               else if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( cnt < max && slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy key
+
                key = keyptr(bt->frame, cnt);
                nxt -= key->len + 1;
                memcpy ((unsigned char *)page + nxt, key, key->len + 1);
@@ -1703,26 +1978,59 @@ BtKey key;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = nxt;
        }
+
        page->min = nxt;
        page->cnt = idx;
 
+       //      see if page has enough space now, or does it need splitting?
+
        if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
                return newslot;
 
        return 0;
 }
 
-// split the root and raise the height of the btree
+//     add key to current page
+//     page must already be writelocked
+
+void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
+{
+uint idx;
+
+       // find next available dead slot and copy key onto page
+
+       for( idx = slot; idx < page->cnt; idx++ )
+         if( slotptr(page, idx)->dead )
+               break;
+
+       if( idx == page->cnt )
+               idx++, page->cnt++;
+
+       page->act++;
+
+       // now insert key into array before slot
 
-BTERR bt_splitroot(BtDb *bt,  unsigned char *newkey, unsigned char *oldkey, uid page_no2)
+       while( idx > slot )
+               *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
+
+       page->min -= len + 1;
+       ((unsigned char *)page)[page->min] = len;
+       memcpy ((unsigned char *)page + page->min +1, key, len );
+
+       bt_putid(slotptr(page,slot)->id, id);
+       slotptr(page, slot)->off = page->min;
+       slotptr(page, slot)->tod = tod;
+       slotptr(page, slot)->dead = 0;
+}
+
+BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
 {
 uint nxt = bt->mgr->page_size;
 BtPage root = bt->page;
 uid new_page;
 
        //  Obtain an empty page to use, and copy the current
-       //  root contents into it which is the lower half of
-       //      the old root.
+       //  root contents into it
 
        if( !(new_page = bt_newpage(bt, root)) )
                return bt->err;
@@ -1734,16 +2042,17 @@ uid new_page;
 
        // insert first key on newroot page
 
-       nxt -= *newkey + 1;
-       memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1);
+       nxt -= *leftkey + 1;
+       memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
        bt_putid(slotptr(root, 1)->id, new_page);
        slotptr(root, 1)->off = nxt;
        
-       // insert second key on newroot page
+       // insert second key (stopper key) on newroot page
        // and increase the root height
 
-       nxt -= *oldkey + 1;
-       memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1);
+       nxt -= 3;
+       *((unsigned char *)root + nxt) = 2;
+       memset ((unsigned char *)root + nxt + 1, 0xff, 2);
        bt_putid(slotptr(root, 2)->id, page_no2);
        slotptr(root, 2)->off = nxt;
 
@@ -1762,31 +2071,26 @@ uid new_page;
 }
 
 //  split already locked full node
-//     return unlocked.
+//     return unlocked and unpinned.
 
-BTERR bt_splitpage (BtDb *bt)
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
 {
-uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
-unsigned char oldkey[256], lowerkey[256];
-uid page_no = bt->page_no, right;
-BtLatchSet *nset, *set = bt->set;
-BtPool *pool = bt->pool;
-BtPage page = bt->page;
+uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
+unsigned char rightkey[256], leftkey[256];
+uint tod = time(NULL);
 uint lvl = page->lvl;
 uid new_page;
 BtKey key;
-uint tod;
 
-       //  split higher half of keys to bt->frame
-       //      the last key (fence key) might be dead
-
-       tod = (uint)time(NULL);
+       //      initialize frame buffer for right node
 
        memset (bt->frame, 0, bt->mgr->page_size);
-       max = (int)page->cnt;
+       max = page->cnt;
        cnt = max / 2;
        idx = 0;
 
+       //  split higher half of keys to bt->frame
+
        while( cnt++ < max ) {
                key = keyptr(page, cnt);
                nxt -= key->len + 1;
@@ -1798,166 +2102,142 @@ uint tod;
                slotptr(bt->frame, idx)->off = nxt;
        }
 
-       // remember existing fence key for new page to the right
+       // transfer right link node to new right node
 
-       memcpy (oldkey, key, key->len + 1);
+       if( page_no > ROOT_page )
+               memcpy (bt->frame->right, page->right, BtId);
 
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
        bt->frame->cnt = idx;
        bt->frame->lvl = lvl;
 
-       // link right node
-
-       if( page_no > ROOT_page ) {
-               right = bt_getid (page->right);
-               bt_putid(bt->frame->right, right);
-       }
-
-       //      get new free page and write frame to it.
+       //      get new free page and write right frame to it.
 
        if( !(new_page = bt_newpage(bt, bt->frame)) )
                return bt->err;
 
+       //      remember fence key for new right page to add
+       //      as right sibling to the left node
+
+       key = keyptr(bt->frame, idx);
+       memcpy (rightkey, key, key->len + 1);
+
        //      update lower keys to continue in old page
 
        memcpy (bt->frame, page, bt->mgr->page_size);
        memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
        nxt = bt->mgr->page_size;
+       page->dirty = 0;
        page->act = 0;
        cnt = 0;
        idx = 0;
 
        //  assemble page of smaller keys
-       //      (they're all active keys)
+       //      to remain in the old page
 
        while( cnt++ < max / 2 ) {
                key = keyptr(bt->frame, cnt);
                nxt -= key->len + 1;
                memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
+                       page->act++;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = nxt;
-               page->act++;
        }
 
-       // remember fence key for old page
+       // finalize left page and save fence key
 
-       memcpy(lowerkey, key, key->len + 1);
-       bt_putid(page->right, new_page);
+       memcpy(leftkey, key, key->len + 1);
        page->min = nxt;
        page->cnt = idx;
 
+       //      link new right page
+
+       bt_putid (page->right, new_page);
+
        // if current page is the root page, split it
 
        if( page_no == ROOT_page )
-               return bt_splitroot (bt, lowerkey, oldkey, new_page);
-
-       // obtain Parent/Write locks
-       // for left and right node pages
+               return bt_splitroot (bt, leftkey, new_page);
 
-       nset = bt_pinlatch (bt, new_page);
+       //  obtain ParentModification lock for current page
 
-       bt_lockpage (BtLockParent, nset);
        bt_lockpage (BtLockParent, set);
 
-       //  release wr lock on left page
-       //  (keep the SMO in sequence)
+       //  release wr lock on our page.
+       //      this will keep out another SMO
 
        bt_unlockpage (BtLockWrite, set);
 
-       // insert new fence for reformulated left block
+       //  insert key for old page (lower keys)
 
-       if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) )
+       if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) )
                return bt->err;
 
-       // fix old fence for newly allocated right block page
+       //      switch old parent key from us to our right page
 
-       if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) )
+       if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) )
                return bt->err;
 
-       // release Parent locks
+       //      unlock and unpin
 
-       bt_unlockpage (BtLockParent, nset);
        bt_unlockpage (BtLockParent, set);
-       bt_unpinlatch (nset);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
        return 0;
 }
 
-//  Insert new key into the btree at requested level.
-//  Level zero pages are leaf pages. Page is unlocked at exit.
+//  Insert new key into the btree at given level.
 
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
 {
 uint slot, idx;
 BtPage page;
 BtKey ptr;
 
-  while( 1 ) {
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
-       else
-       {
-               if ( !bt->err )
-                       bt->err = BTERR_ovflw;
-               return bt->err;
-       }
-
-       // if key already exists, update id and return
-
-       page = bt->page;
-
-       if( bt->found = !keycmp (ptr, key, len) ) {
-               slotptr(page, slot)->dead = 0;
-               slotptr(page, slot)->tod = tod;
-               bt_putid(slotptr(page,slot)->id, id);
-               bt_unlockpage(BtLockWrite, bt->set);
-               bt_unpinlatch(bt->set);
-               bt_unpinpool (bt->pool);
-               return bt->err;
-       }
-
-       // check if page has enough space
-
-       if( slot = bt_cleanpage (bt, len, slot) )
-               break;
-
-       if( bt_splitpage (bt) )
-               return bt->err;
-  }
-
-  // calculate next available slot and copy key into page
+       while( 1 ) {
+               if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
+                       ptr = keyptr(bt->page, slot);
+               else
+               {
+                       if ( !bt->err )
+                               bt->err = BTERR_ovflw;
+                       return bt->err;
+               }
 
-  page->min -= len + 1; // reset lowest used offset
-  ((unsigned char *)page)[page->min] = len;
-  memcpy ((unsigned char *)page + page->min +1, key, len );
+               // if key already exists, update id and return
 
-  for( idx = slot; idx < page->cnt; idx++ )
-       if( slotptr(page, idx)->dead )
-               break;
+               page = bt->page;
 
-  // now insert key into array before slot
-  // preserving the fence slot
+               if( !keycmp (ptr, key, len) ) {
+                       if( slotptr(page, slot)->dead )
+                               page->act++;
+                       slotptr(page, slot)->dead = 0;
+                       slotptr(page, slot)->tod = tod;
+                       bt_putid(slotptr(page,slot)->id, id);
+                       bt_unlockpage(BtLockWrite, bt->set);
+                       bt_unpinlatch (bt->set);
+                       bt_unpinpool (bt->pool);
+                       return bt->err;
+               }
 
-  if( idx == page->cnt )
-       idx++, page->cnt++;
+               // check if page has enough space
 
-  page->act++;
+               if( slot = bt_cleanpage (bt, bt->page, len, slot) )
+                       break;
 
-  while( idx > slot )
-       *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
+               if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+                       return bt->err;
+       }
 
-  bt_putid(slotptr(page,slot)->id, id);
-  slotptr(page, slot)->off = page->min;
-  slotptr(page, slot)->tod = tod;
-  slotptr(page, slot)->dead = 0;
+       bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
 
-  bt_unlockpage (BtLockWrite, bt->set);
-  bt_unpinlatch (bt->set);
-  bt_unpinpool (bt->pool);
-  return 0;
+       bt_unlockpage (BtLockWrite, bt->set);
+       bt_unpinlatch (bt->set);
+       bt_unpinpool (bt->pool);
+       return 0;
 }
 
 //  cache page of keys into cursor and return starting slot for given key
@@ -1969,7 +2249,9 @@ uint slot;
        // cache page for retrieval
        if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
                memcpy (bt->cursor, bt->page, bt->mgr->page_size);
+
        bt->cursor_page = bt->page_no;
+
        bt_unlockpage(BtLockRead, bt->set);
        bt_unpinlatch (bt->set);
        bt_unpinpool (bt->pool);
@@ -1981,6 +2263,7 @@ uint slot;
 
 uint bt_nextkey (BtDb *bt, uint slot)
 {
+BtLatchSet *set;
 BtPool *pool;
 BtPage page;
 uid right;
@@ -1990,7 +2273,7 @@ uid right;
        while( slot++ < bt->cursor->cnt )
          if( slotptr(bt->cursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt))
+         else if( right || (slot < bt->cursor->cnt) )
                return slot;
          else
                break;
@@ -1999,19 +2282,18 @@ uid right;
                break;
 
        bt->cursor_page = right;
-
        if( pool = bt_pinpool (bt, right) )
                page = bt_page (bt, pool, right);
        else
                return 0;
 
-       bt->set = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, bt->set);
+       set = bt_pinlatch (bt, right);
+    bt_lockpage(BtLockRead, set);
 
        memcpy (bt->cursor, page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
+       bt_unlockpage(BtLockRead, set);
+       bt_unpinlatch (set);
        bt_unpinpool (pool);
        slot = 0;
   } while( 1 );
@@ -2034,6 +2316,7 @@ uint bt_tod(BtDb *bt, uint slot)
        return slotptr(bt->cursor,slot)->tod;
 }
 
+
 #ifdef STANDALONE
 
 void bt_latchaudit (BtDb *bt)
@@ -2105,6 +2388,7 @@ uid next, page_no = LEAF_page;    // start on first page of leaves
 unsigned char key[256];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
+BtLatchSet *set;
 time_t tod[1];
 BtPool *pool;
 BtPage page;
@@ -2137,7 +2421,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_insertkey (bt, key, len, 0, line, *tod) )
+                         if( bt_insertkey (bt, key, len, line, *tod, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2159,7 +2443,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_deletekey (bt, key, len, 0) )
+                         if( bt_deletekey (bt, key, len) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2214,17 +2498,17 @@ FILE *in;
                fprintf(stderr, "started reading\n");
 
                do {
-                       if( bt->pool = bt_pinpool (bt, page_no) )
-                               page = bt_page (bt, bt->pool, page_no);
+                       if( pool = bt_pinpool (bt, page_no) )
+                               page = bt_page (bt, pool, page_no);
                        else
                                break;
-                       bt->set = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, bt->set);
+                       set = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set);
                        cnt += page->act;
                        next = bt_getid (page->right);
-                       bt_unlockpage (BtLockRead, bt->set);
-                       bt_unpinlatch (bt->set);
-                       bt_unpinpool (bt->pool);
+                       bt_unlockpage (BtLockRead, set);
+                       bt_unpinlatch (set);
+                       bt_unpinpool (pool);
                } while( page_no = next );
 
                cnt--;  // remove stopper key
index e5baf7b8f452967c81357c444c1af98d755c6f49..54e14bc3fac2b2db1cd431b73ef4d219a03c45ef 100644 (file)
@@ -1,5 +1,5 @@
 // btree version threads2j linux futex concurrency version
-// 24 JAN 2014
+// 29 JAN 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -154,11 +154,9 @@ typedef struct Page {
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
        unsigned char bits;                     // page size in bits
-       unsigned char lvl:6;            // level of page
-       unsigned char kill:1;           // page is being deleted
+       unsigned char lvl:7;            // level of page
        unsigned char dirty:1;          // page has deleted keys
        unsigned char right[BtId];      // page number to right
-       BtSlot table[0];                        // array of key slots
 } *BtPage;
 
 //  hash table entries
@@ -249,6 +247,7 @@ typedef struct {
        BtLatchSet *set;        // current page latchset
        BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
+       int parent;                     // last loadpage was from a parent level
        int found;                      // last delete or insert was found
        int err;                        // last error
 } BtDb;
@@ -266,12 +265,17 @@ typedef enum {
 // B-Tree functions
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr);
-extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
-extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
+extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
+extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
+//     internal functions
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
 void bt_mgrclose (BtMgr *mgr);
@@ -339,7 +343,7 @@ extern uint bt_tod (BtDb *bt, uint slot);
 
 //     Access macros to address slot and key values from the page
 
-#define slotptr(page, slot) (page->table + slot-1)
+#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
 
 void bt_putid(unsigned char *dest, uid id)
@@ -1448,6 +1452,7 @@ BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
 uint mode, prevmode;
 BtPool *prevpool;
+int parent = 1;
 
   //  start at root of btree and drill down
 
@@ -1507,30 +1512,33 @@ BtPool *prevpool;
        //  find key on page at this level
        //  and descend to requested level
 
-       if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) {
+       if( slot = bt_findslot (bt, key, len) ) {
          if( drill == lvl )
-               return slot;
+               return bt->parent = parent, slot;
 
          while( slotptr(bt->page, slot)->dead )
                if( slot++ < bt->page->cnt )
                        continue;
                else {
                        page_no = bt_getid(bt->page->right);
+                       parent = 0;
                        goto slideright;
                }
 
          page_no = bt_getid(slotptr(bt->page, slot)->id);
+         parent = 1;
          drill--;
        }
 
        //  or slide right into next page
-       //  (slide left from deleted page)
 
-       else
+       else {
                page_no = bt_getid(bt->page->right);
+               parent = 0;
+       }
 
        //  continue down / right using overlapping locks
-       //  to protect pages being killed or split.
+       //  to protect pages being split.
 
 slideright:
        prevpage = bt->page_no;
@@ -1545,119 +1553,384 @@ slideright:
   return 0;    // return error
 }
 
-//  find and delete key on page by marking delete flag bit
-//  when page becomes empty, delete it
+//  remove empty page from the B-tree
+//     by pulling our right node left over ourselves
+
+//  call with bt->page, etc, set to page's locked parent
+//     returns with page locked.
 
-BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
+BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
 {
-unsigned char lowerkey[256], higherkey[256];
-BtLatchSet *rset, *set;
-BtPool *pool, *rpool;
-uid page_no, right;
-uint slot, tod;
-BtPage rpage;
+BtLatchSet *rset, *pset, *rpset;
+BtPool *rpool, *ppool, *rppool;
+BtPage rpage, ppage, rppage;
+uid right, parent, rparent;
 BtKey ptr;
+uint idx;
 
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
+       //      cache node's parent page
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       // lock and map our right page
+       //      it cannot be NULL because of the stopper
+       //      in the last right page
+
+       bt_lockpage (BtLockWrite, set);
+
+       // if we aren't dead yet
+
+       if( page->act )
+               goto rmergexit;
+
+       if( right = bt_getid (page->right) )
+         if( rpool = bt_pinpool (bt, right) )
+               rpage = bt_page (bt, rpool, right);
+         else
+               return bt->err;
        else
+               return bt->err = BTERR_struct;
+
+       rset = bt_pinlatch (bt, right);
+
+       //      find our right neighbor
+
+       if( ppage->act > 1 ) {
+        for( idx = slot; idx++ < ppage->cnt; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
+
+        if( idx > ppage->cnt )
+               return bt->err = BTERR_struct;
+
+        //  redirect right neighbor in parent to left node
+
+        bt_putid(slotptr(ppage,idx)->id, page_no);
+       }
+
+       //      if parent has only our deleted page, e.g. no right neighbor
+       //      prepare to merge parent itself
+
+       if( ppage->act == 1 ) {
+         if( rparent = bt_getid (ppage->right) )
+          if( rppool = bt_pinpool (bt, rparent) )
+               rppage = bt_page (bt, rppool, rparent);
+          else
                return bt->err;
+         else
+               return bt->err = BTERR_struct;
 
-       // if key is found delete it, otherwise ignore request
+         rpset = bt_pinlatch (bt, rparent);
+         bt_lockpage (BtLockWrite, rpset);
 
-       if( bt->found = !keycmp (ptr, key, len) )
-               if( bt->found = slotptr(bt->page, slot)->dead == 0 ) {
-                       slotptr(bt->page,slot)->dead = 1;
-                       if( slot < bt->page->cnt )
-                               bt->page->dirty = 1;
-                       bt->page->act--;
+         // find our right neighbor on right parent page
+
+         for( idx = 0; idx++ < rppage->cnt; )
+               if( !slotptr(rppage, idx)->dead ) {
+                 bt_putid (slotptr(rppage, idx)->id, page_no);
+                 break;
                }
 
-       // return if page is not empty, or it has no right sibling
+         if( idx > rppage->cnt )
+               return bt->err = BTERR_struct;
+       }
 
-       right = bt_getid(bt->page->right);
-       page_no = bt->page_no;
-       pool = bt->pool;
-       set = bt->set;
+       //      now that there are no more pointers to our right node
+       //      we can wait for delete lock on it
 
-       if( !right || bt->page->act ) {
-               bt_unlockpage(BtLockWrite, set);
-               bt_unpinlatch (set);
-               bt_unpinpool (pool);
+       bt_lockpage(BtLockDelete, rset);
+       bt_lockpage(BtLockWrite, rset);
+
+       // pull contents of right page into our empty page 
+
+       memcpy (page, rpage, bt->mgr->page_size);
+
+       // ready to release right parent lock
+       //      now that we have a new page in place
+
+       if( ppage->act == 1 ) {
+         bt_unlockpage (BtLockWrite, rpset);
+         bt_unpinlatch (rpset);
+         bt_unpinpool (rppool);
+       }
+
+       //      add killed right block to free chain
+       //      lock latch mgr
+
+       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
+
+       //      store free chain in allocation page second right
+
+       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+
+       // unlock latch mgr and right page
+
+       bt_unlockpage(BtLockDelete, rset);
+       bt_unlockpage(BtLockWrite, rset);
+       bt_unpinlatch (rset);
+       bt_unpinpool (rpool);
+
+       bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
+
+       // delete our obsolete fence key from our parent
+
+       slotptr(ppage, slot)->dead = 1;
+       ppage->dirty = 1;
+
+       //      if our parent now empty
+       //      remove it from the tree
+
+       if( ppage->act-- == 1 )
+         if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
                return bt->err;
+
+rmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinlatch (pset);
+       bt_unpinpool (ppool);
+
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  remove empty page from the B-tree
+//     try merging left first.  If no left
+//     sibling, then merge right.
+
+//     call with page loaded and locked,
+//  return with page locked.
+
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+{
+unsigned char fencekey[256], postkey[256];
+uint slot, idx, postfence = 0;
+BtLatchSet *lset, *pset;
+BtPool *lpool, *ppool;
+BtPage lpage, ppage;
+uid left, parent;
+BtKey ptr;
+
+       ptr = keyptr(page, page->cnt);
+       memcpy(fencekey, ptr, ptr->len + 1);
+       bt_unlockpage (BtLockWrite, set);
+
+       //      load and lock our parent
+
+retry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
+               return bt->err;
+
+       parent = bt->page_no;
+       ppage = bt->page;
+       ppool = bt->pool;
+       pset = bt->set;
+
+       //      wait until we are posted in our parent
+
+       if( !bt->parent ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
        }
 
-       // obtain Parent lock over write lock
+       //  find our left neighbor in our parent page
 
-       bt_lockpage(BtLockParent, set);
+       for( idx = slot; --idx; )
+         if( !slotptr(ppage, idx)->dead )
+               break;
 
-       // keep copy of key to delete
+       //      if no left neighbor, do right merge
 
-       ptr = keyptr(bt->page, bt->page->cnt);
-       memcpy(lowerkey, ptr, ptr->len + 1);
+       if( !idx )
+               return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
 
-       // lock and map right page
+       // lock and map our left neighbor's page
 
-       if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
+       left = bt_getid (slotptr(ppage, idx)->id);
+
+       if( lpool = bt_pinpool (bt, left) )
+               lpage = bt_page (bt, lpool, left);
        else
                return bt->err;
 
-       rset = bt_pinlatch (bt, right);
-       bt_lockpage(BtLockWrite, rset);
+       lset = bt_pinlatch (bt, left);
+       bt_lockpage(BtLockWrite, lset);
 
-       // pull contents of next page into current empty page 
+       //  wait until sibling is in our parent
 
-       memcpy (bt->page, rpage, bt->mgr->page_size);
+       if( bt_getid (lpage->right) != page_no ) {
+               bt_unlockpage (BtLockWrite, pset);
+               bt_unpinlatch (pset);
+               bt_unpinpool (ppool);
+               bt_unlockpage (BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
+       }
 
-       //      keep copy of key to update
+       //  since our page will have no more pointers to it,
+       //      obtain Delete lock and wait for write locks to clear
 
-       ptr = keyptr(rpage, rpage->cnt);
-       memcpy(higherkey, ptr, ptr->len + 1);
+       bt_lockpage(BtLockDelete, set);
+       bt_lockpage(BtLockWrite, set);
 
-       //  Mark right page as deleted and point it to left page
-       //      until we can post updates at higher level.
+       //      if we aren't dead yet,
+       //      get ready for exit
 
-       bt_putid(rpage->right, page_no);
-       rpage->kill = 1;
-       rpage->cnt = 0;
+       if( page->act ) {
+               bt_unlockpage(BtLockDelete, set);
+               bt_unlockpage(BtLockWrite, lset);
+               bt_unpinlatch (lset);
+               bt_unpinpool (lpool);
+               goto lmergexit;
+       }
 
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unlockpage(BtLockWrite, set);
+       //      are we are the fence key for our parent?
+       //      if so, grab our old fence key
 
-       //  delete old lower key to consolidated node
+       if( postfence = slot == ppage->cnt ) {
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(fencekey, ptr, ptr->len + 1);
+               memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
 
-       if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
-               return bt->err;
+               // clear out other dead slots
 
-       //  redirect higher key directly to consolidated node
+               while( --ppage->cnt )
+                 if( slotptr(ppage, ppage->cnt)->dead )
+                       memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
+                 else
+                       break;
 
-       tod = (uint)time(NULL);
+               ptr = keyptr (ppage, ppage->cnt);
+               memcpy(postkey, ptr, ptr->len + 1);
+       } else
+               slotptr(ppage,slot)->dead = 1;
 
-       if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) )
-               return bt->err;
+       ppage->dirty = 1;
+       ppage->act--;
 
-       //      add killed right block to free chain
+       //      push our right neighbor pointer to our left
+
+       memcpy (lpage->right, page->right, BtId);
+
+       //      add ourselves to free chain
        //      lock latch mgr
 
        bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
 
        //      store free chain in allocation page second right
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+       bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
 
-       // unlock latch mgr and unpin right page
+       // unlock latch mgr and pages
 
        bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       bt_unlockpage(BtLockWrite, lset);
+       bt_unpinlatch (lset);
+       bt_unpinpool (lpool);
+
+       // release our node's delete lock
+
+       bt_unlockpage(BtLockDelete, set);
+
+lmergexit:
+       bt_unlockpage (BtLockWrite, pset);
+       bt_unpinpool (ppool);
+
+       //  do we need to post parent's fence key in its parent?
+
+       if( !postfence || parent == ROOT_page ) {
+               bt_unpinlatch (pset);
+               bt->found = 1;
+               return bt->err = 0;
+       }
+
+       //      interlock parent fence post
 
-       //      remove ParentModify lock
+       bt_lockpage (BtLockParent, pset);
 
-       bt_unlockpage(BtLockParent, set);
+       //      load parent's parent page
+posttry:
+       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+               return bt->err;
+
+       if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
+         if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+               return bt->err;
+         else
+               goto posttry;
+
+       page = bt->page;
+
+       page->min -= *postkey + 1;
+       ((unsigned char *)page)[page->min] = *postkey;
+       memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
+       slotptr(page, slot)->off = page->min;
+
+       bt_unlockpage (BtLockParent, pset);
+       bt_unpinlatch (pset);
+
+       bt_unlockpage (BtLockWrite, bt->set);
+       bt_unpinlatch (bt->set);
+       bt_unpinpool (bt->pool);
+
+       bt->found = 1;
+       return bt->err = 0;
+}
+
+//  find and delete key on page by marking delete flag bit
+//  if page becomes empty, delete it from the btree
+
+BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+{
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+BtKey ptr;
+uint slot;
+
+       if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+               return bt->err;
+
+       page_no = bt->page_no;
+       page = bt->page;
+       pool = bt->pool;
+       set = bt->set;
+
+       // if key is found delete it, otherwise ignore request
+
+       ptr = keyptr(page, slot);
+
+       if( bt->found = !keycmp (ptr, key, len) )
+         if( bt->found = slotptr(page, slot)->dead == 0 ) {
+               slotptr(page,slot)->dead = 1;
+                 if( slot < page->cnt )
+                       page->dirty = 1;
+                 if( !--page->act )
+                       if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
+                         return bt->err;
+               }
+
+       bt_unlockpage(BtLockWrite, set);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
-       return 0;
+       return bt->err = 0;
 }
 
 //     find key in leaf level and return row-id
@@ -1676,7 +1949,7 @@ uid id;
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( ptr->len == len && !memcmp (ptr->key, key, len) )
+       if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
                id = bt_getid(slotptr(bt->page,slot)->id);
        else
                id = 0;
@@ -1689,16 +1962,15 @@ uid id;
 
 //     check page for space available,
 //     clean if necessary and return
-//     =0 - page needs splitting
-//     >0 - go ahead at returned slot
+//     0 - page needs splitting
+//     >0  new slot value
 
-uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
 uint nxt = bt->mgr->page_size;
-BtPage page = bt->page;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
-uint newslot;
+uint newslot = max;
 BtKey key;
 
        if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
@@ -1717,15 +1989,19 @@ BtKey key;
        page->dirty = 0;
        page->act = 0;
 
-       // always leave fence key in list
+       // try cleaning up page first
+
+       // always leave fence key in the array
+       // otherwise, remove deleted key
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               else if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( cnt < max && slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy key
+
                key = keyptr(bt->frame, cnt);
                nxt -= key->len + 1;
                memcpy ((unsigned char *)page + nxt, key, key->len + 1);
@@ -1737,26 +2013,59 @@ BtKey key;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = nxt;
        }
+
        page->min = nxt;
        page->cnt = idx;
 
+       //      see if page has enough space now, or does it need splitting?
+
        if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
                return newslot;
 
        return 0;
 }
 
-// split the root and raise the height of the btree
+//     add key to current page
+//     page must already be writelocked
 
-BTERR bt_splitroot(BtDb *bt,  unsigned char *newkey, unsigned char *oldkey, uid page_no2)
+void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
+{
+uint idx;
+
+       // find next available dead slot and copy key onto page
+
+       for( idx = slot; idx < page->cnt; idx++ )
+         if( slotptr(page, idx)->dead )
+               break;
+
+       if( idx == page->cnt )
+               idx++, page->cnt++;
+
+       page->act++;
+
+       // now insert key into array before slot
+
+       while( idx > slot )
+               *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
+
+       page->min -= len + 1;
+       ((unsigned char *)page)[page->min] = len;
+       memcpy ((unsigned char *)page + page->min +1, key, len );
+
+       bt_putid(slotptr(page,slot)->id, id);
+       slotptr(page, slot)->off = page->min;
+       slotptr(page, slot)->tod = tod;
+       slotptr(page, slot)->dead = 0;
+}
+
+BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
 {
 uint nxt = bt->mgr->page_size;
 BtPage root = bt->page;
 uid new_page;
 
        //  Obtain an empty page to use, and copy the current
-       //  root contents into it which is the lower half of
-       //      the old root.
+       //  root contents into it
 
        if( !(new_page = bt_newpage(bt, root)) )
                return bt->err;
@@ -1768,16 +2077,17 @@ uid new_page;
 
        // insert first key on newroot page
 
-       nxt -= *newkey + 1;
-       memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1);
+       nxt -= *leftkey + 1;
+       memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
        bt_putid(slotptr(root, 1)->id, new_page);
        slotptr(root, 1)->off = nxt;
        
-       // insert second key on newroot page
+       // insert second key (stopper key) on newroot page
        // and increase the root height
 
-       nxt -= *oldkey + 1;
-       memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1);
+       nxt -= 3;
+       *((unsigned char *)root + nxt) = 2;
+       memset ((unsigned char *)root + nxt + 1, 0xff, 2);
        bt_putid(slotptr(root, 2)->id, page_no2);
        slotptr(root, 2)->off = nxt;
 
@@ -1796,31 +2106,26 @@ uid new_page;
 }
 
 //  split already locked full node
-//     return unlocked.
+//     return unlocked and unpinned.
 
-BTERR bt_splitpage (BtDb *bt)
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
 {
-uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
-unsigned char oldkey[256], lowerkey[256];
-uid page_no = bt->page_no, right;
-BtLatchSet *nset, *set = bt->set;
-BtPool *pool = bt->pool;
-BtPage page = bt->page;
+uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
+unsigned char rightkey[256], leftkey[256];
+uint tod = time(NULL);
 uint lvl = page->lvl;
 uid new_page;
 BtKey key;
-uint tod;
-
-       //  split higher half of keys to bt->frame
-       //      the last key (fence key) might be dead
 
-       tod = (uint)time(NULL);
+       //      initialize frame buffer for right node
 
        memset (bt->frame, 0, bt->mgr->page_size);
-       max = (int)page->cnt;
+       max = page->cnt;
        cnt = max / 2;
        idx = 0;
 
+       //  split higher half of keys to bt->frame
+
        while( cnt++ < max ) {
                key = keyptr(page, cnt);
                nxt -= key->len + 1;
@@ -1832,166 +2137,142 @@ uint tod;
                slotptr(bt->frame, idx)->off = nxt;
        }
 
-       // remember existing fence key for new page to the right
+       // transfer right link node to new right node
 
-       memcpy (oldkey, key, key->len + 1);
+       if( page_no > ROOT_page )
+               memcpy (bt->frame->right, page->right, BtId);
 
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
        bt->frame->cnt = idx;
        bt->frame->lvl = lvl;
 
-       // link right node
-
-       if( page_no > ROOT_page ) {
-               right = bt_getid (page->right);
-               bt_putid(bt->frame->right, right);
-       }
-
-       //      get new free page and write frame to it.
+       //      get new free page and write right frame to it.
 
        if( !(new_page = bt_newpage(bt, bt->frame)) )
                return bt->err;
 
+       //      remember fence key for new right page to add
+       //      as right sibling to the left node
+
+       key = keyptr(bt->frame, idx);
+       memcpy (rightkey, key, key->len + 1);
+
        //      update lower keys to continue in old page
 
        memcpy (bt->frame, page, bt->mgr->page_size);
        memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
        nxt = bt->mgr->page_size;
+       page->dirty = 0;
        page->act = 0;
        cnt = 0;
        idx = 0;
 
        //  assemble page of smaller keys
-       //      (they're all active keys)
+       //      to remain in the old page
 
        while( cnt++ < max / 2 ) {
                key = keyptr(bt->frame, cnt);
                nxt -= key->len + 1;
                memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
+                       page->act++;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = nxt;
-               page->act++;
        }
 
-       // remember fence key for old page
+       // finalize left page and save fence key
 
-       memcpy(lowerkey, key, key->len + 1);
-       bt_putid(page->right, new_page);
+       memcpy(leftkey, key, key->len + 1);
        page->min = nxt;
        page->cnt = idx;
 
+       //      link new right page
+
+       bt_putid (page->right, new_page);
+
        // if current page is the root page, split it
 
        if( page_no == ROOT_page )
-               return bt_splitroot (bt, lowerkey, oldkey, new_page);
-
-       // obtain Parent/Write locks
-       // for left and right node pages
+               return bt_splitroot (bt, leftkey, new_page);
 
-       nset = bt_pinlatch (bt, new_page);
+       //  obtain ParentModification lock for current page
 
-       bt_lockpage (BtLockParent, nset);
        bt_lockpage (BtLockParent, set);
 
-       //  release wr lock on left page
-       //  (keep the SMO in sequence)
+       //  release wr lock on our page.
+       //      this will keep out another SMO
 
        bt_unlockpage (BtLockWrite, set);
 
-       // insert new fence for reformulated left block
+       //  insert key for old page (lower keys)
 
-       if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) )
+       if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) )
                return bt->err;
 
-       // fix old fence for newly allocated right block page
+       //      switch old parent key from us to our right page
 
-       if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) )
+       if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) )
                return bt->err;
 
-       // release Parent locks
+       //      unlock and unpin
 
-       bt_unlockpage (BtLockParent, nset);
        bt_unlockpage (BtLockParent, set);
-       bt_unpinlatch (nset);
        bt_unpinlatch (set);
        bt_unpinpool (pool);
        return 0;
 }
 
-//  Insert new key into the btree at requested level.
-//  Level zero pages are leaf pages. Page is unlocked at exit.
+//  Insert new key into the btree at given level.
 
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
 {
 uint slot, idx;
 BtPage page;
 BtKey ptr;
 
-  while( 1 ) {
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
-       else
-       {
-               if ( !bt->err )
-                       bt->err = BTERR_ovflw;
-               return bt->err;
-       }
-
-       // if key already exists, update id and return
-
-       page = bt->page;
-
-       if( bt->found = !keycmp (ptr, key, len) ) {
-               slotptr(page, slot)->dead = 0;
-               slotptr(page, slot)->tod = tod;
-               bt_putid(slotptr(page,slot)->id, id);
-               bt_unlockpage(BtLockWrite, bt->set);
-               bt_unpinlatch(bt->set);
-               bt_unpinpool (bt->pool);
-               return bt->err;
-       }
-
-       // check if page has enough space
-
-       if( slot = bt_cleanpage (bt, len, slot) )
-               break;
-
-       if( bt_splitpage (bt) )
-               return bt->err;
-  }
-
-  // calculate next available slot and copy key into page
+       while( 1 ) {
+               if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
+                       ptr = keyptr(bt->page, slot);
+               else
+               {
+                       if ( !bt->err )
+                               bt->err = BTERR_ovflw;
+                       return bt->err;
+               }
 
-  page->min -= len + 1; // reset lowest used offset
-  ((unsigned char *)page)[page->min] = len;
-  memcpy ((unsigned char *)page + page->min +1, key, len );
+               // if key already exists, update id and return
 
-  for( idx = slot; idx < page->cnt; idx++ )
-       if( slotptr(page, idx)->dead )
-               break;
+               page = bt->page;
 
-  // now insert key into array before slot
-  // preserving the fence slot
+               if( !keycmp (ptr, key, len) ) {
+                       if( slotptr(page, slot)->dead )
+                               page->act++;
+                       slotptr(page, slot)->dead = 0;
+                       slotptr(page, slot)->tod = tod;
+                       bt_putid(slotptr(page,slot)->id, id);
+                       bt_unlockpage(BtLockWrite, bt->set);
+                       bt_unpinlatch (bt->set);
+                       bt_unpinpool (bt->pool);
+                       return bt->err;
+               }
 
-  if( idx == page->cnt )
-       idx++, page->cnt++;
+               // check if page has enough space
 
-  page->act++;
+               if( slot = bt_cleanpage (bt, bt->page, len, slot) )
+                       break;
 
-  while( idx > slot )
-       *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
+               if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+                       return bt->err;
+       }
 
-  bt_putid(slotptr(page,slot)->id, id);
-  slotptr(page, slot)->off = page->min;
-  slotptr(page, slot)->tod = tod;
-  slotptr(page, slot)->dead = 0;
+       bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
 
-  bt_unlockpage (BtLockWrite, bt->set);
-  bt_unpinlatch (bt->set);
-  bt_unpinpool (bt->pool);
-  return 0;
+       bt_unlockpage (BtLockWrite, bt->set);
+       bt_unpinlatch (bt->set);
+       bt_unpinpool (bt->pool);
+       return 0;
 }
 
 //  cache page of keys into cursor and return starting slot for given key
@@ -2003,7 +2284,9 @@ uint slot;
        // cache page for retrieval
        if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
                memcpy (bt->cursor, bt->page, bt->mgr->page_size);
+
        bt->cursor_page = bt->page_no;
+
        bt_unlockpage(BtLockRead, bt->set);
        bt_unpinlatch (bt->set);
        bt_unpinpool (bt->pool);
@@ -2015,6 +2298,7 @@ uint slot;
 
 uint bt_nextkey (BtDb *bt, uint slot)
 {
+BtLatchSet *set;
 BtPool *pool;
 BtPage page;
 uid right;
@@ -2024,7 +2308,7 @@ uid right;
        while( slot++ < bt->cursor->cnt )
          if( slotptr(bt->cursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt))
+         else if( right || (slot < bt->cursor->cnt) )
                return slot;
          else
                break;
@@ -2033,19 +2317,18 @@ uid right;
                break;
 
        bt->cursor_page = right;
-
        if( pool = bt_pinpool (bt, right) )
                page = bt_page (bt, pool, right);
        else
                return 0;
 
-       bt->set = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, bt->set);
+       set = bt_pinlatch (bt, right);
+    bt_lockpage(BtLockRead, set);
 
        memcpy (bt->cursor, page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
+       bt_unlockpage(BtLockRead, set);
+       bt_unpinlatch (set);
        bt_unpinpool (pool);
        slot = 0;
   } while( 1 );
@@ -2068,6 +2351,7 @@ uint bt_tod(BtDb *bt, uint slot)
        return slotptr(bt->cursor,slot)->tod;
 }
 
+
 #ifdef STANDALONE
 
 void bt_latchaudit (BtDb *bt)
@@ -2139,6 +2423,7 @@ uid next, page_no = LEAF_page;    // start on first page of leaves
 unsigned char key[256];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
+BtLatchSet *set;
 time_t tod[1];
 BtPool *pool;
 BtPage page;
@@ -2171,7 +2456,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_insertkey (bt, key, len, 0, line, *tod) )
+                         if( bt_insertkey (bt, key, len, line, *tod, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2193,7 +2478,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_deletekey (bt, key, len, 0) )
+                         if( bt_deletekey (bt, key, len) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2248,17 +2533,17 @@ FILE *in;
                fprintf(stderr, "started reading\n");
 
                do {
-                       if( bt->pool = bt_pinpool (bt, page_no) )
-                               page = bt_page (bt, bt->pool, page_no);
+                       if( pool = bt_pinpool (bt, page_no) )
+                               page = bt_page (bt, pool, page_no);
                        else
                                break;
-                       bt->set = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, bt->set);
+                       set = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set);
                        cnt += page->act;
                        next = bt_getid (page->right);
-                       bt_unlockpage (BtLockRead, bt->set);
-                       bt_unpinlatch (bt->set);
-                       bt_unpinpool (bt->pool);
+                       bt_unlockpage (BtLockRead, set);
+                       bt_unpinlatch (set);
+                       bt_unpinpool (pool);
                } while( page_no = next );
 
                cnt--;  // remove stopper key