]> pd.if.org Git - btree/commitdiff
Finish rework of bt_deletekey, w/bug fixes
authorunknown <karl@E04.petzent.com>
Wed, 12 Feb 2014 23:13:24 +0000 (15:13 -0800)
committerunknown <karl@E04.petzent.com>
Wed, 12 Feb 2014 23:13:24 +0000 (15:13 -0800)
threads2h.c
threads2i.c
threads2j.c

index f1f774d23125ebc19fccedd78285e4a1617373ff..bb8c2363c850d32f4883bfe1d181a10b9abd7d76 100644 (file)
@@ -1,5 +1,6 @@
 // btree version threads2h pthread rw lock version
-// 30 JAN 2014
+//     with reworked bt_deletekey code
+// 12 FEB 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -49,6 +50,7 @@ REDISTRIBUTION OF THIS SOFTWARE.
 
 #include <memory.h>
 #include <string.h>
+#include <stddef.h>
 
 typedef unsigned long long     uid;
 
@@ -169,14 +171,19 @@ typedef struct {
 //     It is immediately followed
 //     by the BtSlot array of keys.
 
-typedef struct Page {
+typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
-       unsigned char bits;                     // page size in bits
-       unsigned char lvl:7;            // level of page
+       unsigned char bits:7;           // page size in bits
+       unsigned char free:1;           // page is on free list
+       unsigned char lvl:4;            // level of page
+       unsigned char kill:1;           // page is being killed
        unsigned char dirty:1;          // page has deleted keys
+       unsigned char posted:1;         // page fence is posted
+       unsigned char goright:1;        // page is being killed, continue to right
        unsigned char right[BtId];      // page number to right
+       unsigned char fence[256];       // page fence key
 } *BtPage;
 
 //     The memory mapping pool table buffer manager entry
@@ -194,10 +201,19 @@ typedef struct {
 #endif
 } BtPool;
 
+//  The loadpage interface object
+
+typedef struct {
+       uid page_no;            // current page number
+       BtPage page;            // current page pointer
+       BtPool *pool;           // current page pool
+       BtLatchSet *latch;      // current page latch set
+} BtPageSet;
+
 //     structure for latch manager on ALLOC_page
 
 typedef struct {
-       struct Page alloc[2];           // next & free page_nos in right ptr
+       struct BtPage_ alloc[2];        // next & free page_nos in right ptr
        BtSpinLatch lock[1];            // allocation area lite latch
        ushort latchdeployed;           // highest number of latch entries deployed
        ushort nlatchpage;                      // number of latch pages at BT_latch
@@ -215,7 +231,6 @@ typedef struct {
        uint seg_bits;                          // seg size in pages in bits
        uint mode;                                      // read-write mode
 #ifdef unix
-       char *pooladvise;                       // bit maps for pool page advisements
        int idx;
 #else
        HANDLE idx;
@@ -240,13 +255,8 @@ typedef struct {
        BtPage cursor;          // cached frame for start/next (never mapped)
        BtPage frame;           // spare frame for the page split (never mapped)
        BtPage zero;            // page frame for zeroes at end of file
-       BtPage page;            // current page
-       uid page_no;            // current page number  
        uid cursor_page;        // current cursor page number   
-       BtLatchSet *set;        // current page latch set
-       BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
-       int parent;                     // last loadpage was from a parent level
        int found;                      // last delete or insert was found
        int err;                        // last error
 } BtDb;
@@ -271,9 +281,7 @@ extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
 //     internal functions
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
-uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence);
 
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
@@ -319,8 +327,8 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //     one with two keys.
 
 //     Deleted keys are marked with a dead bit until
-//     page cleanup The fence key for a node is always
-//     present, even after deletion and cleanup.
+//     page cleanup The fence key for a node is
+//     present in a special array.
 
 //  Groups of pages called segments from the btree are optionally
 //  cached with a memory mapped pool. A hash table is used to keep
@@ -335,12 +343,13 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //  Page 0 is dedicated to lock for new page extensions,
 //     and chains empty pages together for reuse.
 
-//     The ParentModification lock on a node is obtained to prevent resplitting
-//     or deleting a node before its fence is posted into its upper level.
+//     The ParentModification lock on a node is obtained to serialize posting
+//     or changing the fence key for a node.
 
 //     Empty pages are chained together through the ALLOC page and reused.
 
-//     Access macros to address slot and key values from the page
+//     Access macros to address slot and key values from the page.
+//     Page slots use 1 based indexing.
 
 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
@@ -556,19 +565,12 @@ void bt_releaseread(BtLatch *latch)
 #endif
 }
 
-void bt_initlockset (BtLatchSet *set, int reuse)
+void bt_initlockset (BtLatchSet *set)
 {
 #ifdef unix
 pthread_rwlockattr_t rwattr[1];
 
-       if( reuse ) {
-               pthread_rwlock_destroy (set->readwr->lock);
-               pthread_rwlock_destroy (set->access->lock);
-               pthread_rwlock_destroy (set->parent->lock);
-       }
-
        pthread_rwlockattr_init (rwattr);
-       pthread_rwlockattr_setkind_np (rwattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
        pthread_rwlockattr_setpshared (rwattr, PTHREAD_PROCESS_SHARED);
 
        pthread_rwlock_init (set->readwr->lock, rwattr);
@@ -682,7 +684,7 @@ BtLatchSet *set;
 #else
                _InterlockedIncrement16 (&set->pin);
 #endif
-               bt_initlockset (set, 0);
+               bt_initlockset (set);
                bt_latchlink (bt, hashidx, victim, page_no);
                bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
                return set;
@@ -747,7 +749,6 @@ BtLatchSet *set;
 #else
        _InterlockedIncrement16 (&set->pin);
 #endif
-       bt_initlockset (set, 1);
        bt_latchlink (bt, hashidx, victim, page_no);
        bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
        bt_spinreleasewrite (set->busy);
@@ -790,7 +791,6 @@ uint slot;
        free (mgr->pool);
        free (mgr->hash);
        free (mgr->latch);
-       free (mgr->pooladvise);
        free (mgr);
 #else
        FlushFileBuffers(mgr->idx);
@@ -829,7 +829,6 @@ BtLatchMgr *latchmgr;
 off64_t size;
 uint amt[1];
 BtMgr* mgr;
-BtKey key;
 int flag;
 
 #ifndef unix
@@ -923,7 +922,6 @@ SYSTEM_INFO sysinfo[1];
        mgr->pool = calloc (poolmax, sizeof(BtPool));
        mgr->hash = calloc (hashsize, sizeof(ushort));
        mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
-       mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
 #else
        mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
        mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
@@ -970,13 +968,12 @@ SYSTEM_INFO sysinfo[1];
        latchmgr->alloc->bits = mgr->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
-               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
+               slotptr(latchmgr->alloc, 1)->off = offsetof(struct BtPage_, fence);
                bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
-               key = keyptr(latchmgr->alloc, 1);
-               key->len = 2;                   // create stopper key
-               key->key[0] = 0xff;
-               key->key[1] = 0xff;
-               latchmgr->alloc->min = mgr->page_size - 3;
+               latchmgr->alloc->fence[0] = 2;
+               latchmgr->alloc->fence[1] = 0xff;
+               latchmgr->alloc->fence[2] = 0xff;
+               latchmgr->alloc->min = mgr->page_size;
                latchmgr->alloc->lvl = lvl;
                latchmgr->alloc->cnt = 1;
                latchmgr->alloc->act = 1;
@@ -1169,12 +1166,9 @@ int flag;
 
 #ifdef unix
        flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
-       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
+       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED | MAP_POPULATE, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
-
-       // clear out madvise issued bits
-       memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
 #else
        flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
        pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
@@ -1197,17 +1191,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
-       {
-       uint idx = subpage / 8;
-       uint bit = subpage % 8;
-
-               if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
-                 madvise (page, bt->mgr->page_size, MADV_WILLNEED);
-                 (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
-               }
-       }
-#endif
        return page;
 }
 
@@ -1415,10 +1398,8 @@ void bt_unlockpage(BtLock mode, BtLatchSet *set)
 
 uid bt_newpage(BtDb *bt, BtPage page)
 {
-BtLatchSet *set;
-BtPool *pool;
+BtPageSet set[1];
 uid new_page;
-BtPage pmap;
 int reuse;
 
        //      lock allocation page
@@ -1429,12 +1410,13 @@ int reuse;
        // else allocate empty page
 
        if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
-               if( pool = bt_pinpool (bt, new_page) )
-                       pmap = bt_page (bt, pool, new_page);
+               if( set->pool = bt_pinpool (bt, new_page) )
+                       set->page = bt_page (bt, set->pool, new_page);
                else
                        return 0;
-               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
-               bt_unpinpool (pool);
+
+               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
+               bt_unpinpool (set->pool);
                reuse = 1;
        } else {
                new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
@@ -1450,7 +1432,6 @@ int reuse;
        if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
        {
                // use zero buffer to write zeros
-               memset(bt->zero, 0, bt->mgr->page_size);
                if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
                        return bt->err = BTERR_wrt, 0;
        }
@@ -1458,13 +1439,13 @@ int reuse;
        //      bring new page into pool and copy page.
        //      this will extend the file into the new pages.
 
-       if( pool = bt_pinpool (bt, new_page) )
-               pmap = bt_page (bt, pool, new_page);
+       if( set->pool = bt_pinpool (bt, new_page) )
+               set->page = bt_page (bt, set->pool, new_page);
        else
                return 0;
 
-       memcpy(pmap, page, bt->mgr->page_size);
-       bt_unpinpool (pool);
+       memcpy(set->page, page, bt->mgr->page_size);
+       bt_unpinpool (set->pool);
 #endif
        // unlock allocation latch and return new page no
 
@@ -1474,142 +1455,160 @@ int reuse;
 
 //  find slot in page for given key at a given level
 
-int bt_findslot (BtDb *bt, unsigned char *key, uint len)
+int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
 {
-uint diff, higher = bt->page->cnt, low = 1, slot;
-uint good = 0;
+uint diff, higher = set->page->cnt, low = 1, slot;
 
-       //      if no right link
        //        make stopper key an infinite fence value
-       //        by setting the good flag
 
-       if( bt_getid (bt->page->right) )
+       if( bt_getid (set->page->right) )
                higher++;
-       else
-               good++;
 
-       //      low is the next candidate.
+       //      low is the lowest candidate.
        //  loop ends when they meet
 
-       //  if good, higher is already
+       //  higher is already
        //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
-               if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
+               if( keycmp (keyptr(set->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot, good++;
+                       higher = slot;
        }
 
+       if( higher <= set->page->cnt )
+               return higher;
+
+       //      if leaf page, compare against fence value
+
        //      return zero if key is on right link page
+       //      or return slot beyond last key
+
+       if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
+               return 0;
 
-       return good ? higher : 0;
+       return higher;
 }
 
 //  find and load page at given level for given key
 //     leave page rd or wr locked as requested
 
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
+int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
 {
 uid page_no = ROOT_page, prevpage = 0;
-BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
+BtLatchSet *prevlatch;
 uint mode, prevmode;
 BtPool *prevpool;
-int parent = 1;
 
   //  start at root of btree and drill down
 
-  bt->set = NULL;
-
   do {
        // determine lock mode of drill level
        mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
 
-       bt->set = bt_pinlatch (bt, page_no);
-       bt->page_no = page_no;
+       set->latch = bt_pinlatch (bt, page_no);
+       set->page_no = page_no;
 
        // pin page contents
 
-       if( bt->pool = bt_pinpool (bt, page_no) )
-               bt->page = bt_page (bt, bt->pool, page_no);
+       if( set->pool = bt_pinpool (bt, page_no) )
+               set->page = bt_page (bt, set->pool, page_no);
        else
                return 0;
 
        // obtain access lock using lock chaining with Access mode
 
        if( page_no > ROOT_page )
-         bt_lockpage(BtLockAccess, bt->set);
+         bt_lockpage(BtLockAccess, set->latch);
 
        //      release & unpin parent page
 
        if( prevpage ) {
-         bt_unlockpage(prevmode, prevset);
-         bt_unpinlatch (prevset);
+         bt_unlockpage(prevmode, prevlatch);
+         bt_unpinlatch (prevlatch);
          bt_unpinpool (prevpool);
          prevpage = 0;
        }
 
        // obtain read lock using lock chaining
 
-       bt_lockpage(mode, bt->set);
+       bt_lockpage(mode, set->latch);
 
        if( page_no > ROOT_page )
-         bt_unlockpage(BtLockAccess, bt->set);
+         bt_unlockpage(BtLockAccess, set->latch);
 
        // re-read and re-lock root after determining actual level of root
 
-       if( bt->page->lvl != drill) {
-               if ( bt->page_no != ROOT_page )
+       if( set->page->lvl != drill) {
+               if ( set->page_no != ROOT_page )
                        return bt->err = BTERR_struct, 0;
                        
-               drill = bt->page->lvl;
+               drill = set->page->lvl;
 
                if( lock == BtLockWrite && drill == lvl ) {
-                 bt_unlockpage(mode, bt->set);
-                 bt_unpinlatch (bt->set);
-                 bt_unpinpool (bt->pool);
+                 bt_unlockpage(mode, set->latch);
+                 bt_unpinlatch (set->latch);
+                 bt_unpinpool (set->pool);
                  continue;
                }
        }
 
+       prevpage = set->page_no;
+       prevlatch = set->latch;
+       prevpool = set->pool;
+       prevmode = mode;
+
+       //      if page is being deleted and we should continue right
+
+       if( set->page->kill && set->page->goright ) {
+               page_no = bt_getid (set->page->right);
+               continue;
+       }
+
+       //      otherwise, wait for deleted node to clear
+
+       if( set->page->kill ) {
+               bt_unlockpage(mode, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               page_no = ROOT_page;
+               prevpage = 0;
+               drill = 0xff;
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+       
        //  find key on page at this level
        //  and descend to requested level
 
-       if( slot = bt_findslot (bt, key, len) ) {
+       if( slot = bt_findslot (set, key, len) ) {
          if( drill == lvl )
-               return bt->parent = parent, slot;
+               return slot;
+
+         if( slot > set->page->cnt )
+               return bt->err = BTERR_struct;
 
-         while( slotptr(bt->page, slot)->dead )
-               if( slot++ < bt->page->cnt )
+         while( slotptr(set->page, slot)->dead )
+               if( slot++ < set->page->cnt )
                        continue;
-               else {
-                       page_no = bt_getid(bt->page->right);
-                       parent = 0;
-                       goto slideright;
-               }
+               else
+                       return bt->err = BTERR_struct, 0;
 
-         page_no = bt_getid(slotptr(bt->page, slot)->id);
-         parent = 1;
+         page_no = bt_getid(slotptr(set->page, slot)->id);
          drill--;
+         continue;
        }
 
        //  or slide right into next page
 
-       else {
-               page_no = bt_getid(bt->page->right);
-               parent = 0;
-       }
-
-       //  continue down / right using overlapping locks
-       //  to protect pages being split.
-
-slideright:
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
+       page_no = bt_getid(set->page->right);
 
   } while( page_no );
 
@@ -1619,189 +1618,287 @@ slideright:
   return 0;    // return error
 }
 
-//  remove empty page from the B-tree
-//     by pulling our right node left over ourselves
+// drill down fixing fence values for left sibling tree
 
-//  call with bt->page, etc, set to page's locked parent
-//     returns with page locked.
+//  call with set write locked
+//     return with set unlocked & unpinned.
 
-BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
+BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
 {
-BtLatchSet *rset, *pset, *rpset;
-BtPool *rpool, *ppool, *rppool;
-BtPage rpage, ppage, rppage;
-uid right, parent, rparent;
-BtKey ptr;
-uint idx;
+unsigned char oldfence[256];
+BtPageSet next[1];
+int chk;
+
+  memcpy (oldfence, set->page->fence, 256);
+
+  while( !set->page->kill && set->page->lvl ) {
+       next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
+       next->latch = bt_pinlatch (bt, next->page_no);
+       bt_lockpage (BtLockParent, next->latch);
+       bt_lockpage (BtLockAccess, next->latch);
+       bt_lockpage (BtLockWrite, next->latch);
+       bt_unlockpage (BtLockAccess, next->latch);
+
+       if( next->pool = bt_pinpool (bt, next->page_no) )
+               next->page = bt_page (bt, next->pool, next->page_no);
+       else
+               return bt->err;
 
-       //      cache node's parent page
+       chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
 
-       parent = bt->page_no;
-       ppage = bt->page;
-       ppool = bt->pool;
-       pset = bt->set;
+       if( chk < 0 ) {
+         next->page_no = bt_getid (next->page->right);
+         bt_unlockpage (BtLockWrite, next->latch);
+         bt_unlockpage (BtLockParent, next->latch);
+         bt_unpinlatch (next->latch);
+         bt_unpinpool (next->pool);
+         continue;
+       }
 
-       // lock and map our right page
-       //      it cannot be NULL because of the stopper
-       //      in the last right page
+       if( chk > 0 )
+               return bt->err = BTERR_struct;
 
-       bt_lockpage (BtLockWrite, set);
+       if( bt_fixfences (bt, next, newfence) )
+               return bt->err;
 
-       // if we aren't dead yet
+       break;
+  }
 
-       if( page->act )
-               goto rmergexit;
+  memcpy (set->page->fence, newfence, 256);
 
-       if( right = bt_getid (page->right) )
-         if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
-         else
-               return bt->err;
-       else
-               return bt->err = BTERR_struct;
+  bt_unlockpage (BtLockWrite, set->latch);
+  bt_unlockpage (BtLockParent, set->latch);
+  bt_unpinlatch (set->latch);
+  bt_unpinpool (set->pool);
+  return 0;
+}
 
-       rset = bt_pinlatch (bt, right);
+//     return page to free list
+//     page must be delete & write locked
 
-       //      find our right neighbor
+void bt_freepage (BtDb *bt, BtPageSet *set)
+{
+       //      lock allocation page
 
-       if( ppage->act > 1 ) {
-        for( idx = slot; idx++ < ppage->cnt; )
-         if( !slotptr(ppage, idx)->dead )
-               break;
+       bt_spinwritelock (bt->mgr->latchmgr->lock);
 
-        if( idx > ppage->cnt )
-               return bt->err = BTERR_struct;
+       //      store chain in second right
+       bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
+       set->page->free = 1;
 
-        //  redirect right neighbor in parent to left node
+       // unlock released page
 
-        bt_putid(slotptr(ppage,idx)->id, page_no);
-       }
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
 
-       //      if parent has only our deleted page, e.g. no right neighbor
-       //      prepare to merge parent itself
+       // unlock allocation page
 
-       if( ppage->act == 1 ) {
-         if( rparent = bt_getid (ppage->right) )
-          if( rppool = bt_pinpool (bt, rparent) )
-               rppage = bt_page (bt, rppool, rparent);
-          else
-               return bt->err;
+       bt_spinreleasewrite (bt->mgr->latchmgr->lock);
+}
+
+//     remove the root level by promoting its only child
+//     call with parent and child pages
+
+BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
+{
+uid next = 0;
+
+  do {
+       if( next ) {
+         child->latch = bt_pinlatch (bt, next);
+         bt_lockpage (BtLockDelete, child->latch);
+         bt_lockpage (BtLockWrite, child->latch);
+
+         if( child->pool = bt_pinpool (bt, next) )
+               child->page = bt_page (bt, child->pool, next);
          else
-               return bt->err = BTERR_struct;
+               return bt->err;
+
+         child->page_no = next;
+       }
+
+       memcpy (root->page, child->page, bt->mgr->page_size);
+       next = bt_getid (slotptr(child->page, child->page->cnt)->id);
+       bt_freepage (bt, child);
+  } while( root->page->lvl > 1 && root->page->cnt == 1 );
+
+  bt_unlockpage (BtLockWrite, root->latch);
+  bt_unpinlatch (root->latch);
+  bt_unpinpool (root->pool);
+  return 0;
+}
+
+//  pull right page over ourselves in simple merge
+
+BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
+{
+       //  install ourselves as child page
+       //      and delete ourselves from parent
 
-         rpset = bt_pinlatch (bt, rparent);
-         bt_lockpage (BtLockWrite, rpset);
+       bt_putid (slotptr(parent->page, idx)->id, set->page_no);
+       slotptr(parent->page, slot)->dead = 1;
+       parent->page->act--;
 
-         // find our right neighbor on right parent page
+       //      collapse any empty slots
 
-         for( idx = 0; idx++ < rppage->cnt; )
-               if( !slotptr(rppage, idx)->dead ) {
-                 bt_putid (slotptr(rppage, idx)->id, page_no);
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
                  break;
-               }
 
-         if( idx > rppage->cnt )
-               return bt->err = BTERR_struct;
-       }
+       memcpy (set->page, right->page, bt->mgr->page_size);
+       bt_unlockpage (BtLockParent, right->latch);
 
-       //      now that there are no more pointers to our right node
-       //      we can wait for delete lock on it
+       bt_freepage (bt, right);
 
-       bt_lockpage(BtLockDelete, rset);
-       bt_lockpage(BtLockWrite, rset);
+       //      do we need to remove a btree level?
+       //      (leave the first page of leaves alone)
 
-       // pull contents of right page into our empty page 
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+         if( set->page->lvl )
+               return bt_removeroot (bt, parent, set);
 
-       memcpy (page, rpage, bt->mgr->page_size);
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
+}
 
-       // ready to release right parent lock
-       //      now that we have a new page in place
+//     remove both child and parent from the btree
+//     from the fence position in the parent
+//     call with both pages locked for writing
 
-       if( ppage->act == 1 ) {
-         bt_unlockpage (BtLockWrite, rpset);
-         bt_unpinlatch (rpset);
-         bt_unpinpool (rppool);
-       }
+BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl)
+{
+unsigned char pagefence[256];
+uint idx;
 
-       //      add killed right block to free chain
-       //      lock latch mgr
+       //  pull right sibling over ourselves and unlock
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock);
+       memcpy (child->page, right->page, bt->mgr->page_size);
 
-       //      store free chain in allocation page second right
+       bt_unlockpage (BtLockWrite, child->latch);
+       bt_unpinlatch (child->latch);
+       bt_unpinpool (child->pool);
 
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+       //  install ourselves into right link of old right page
 
-       // unlock latch mgr and right page
+       bt_putid (right->page->right, child->page_no);
+       right->page->goright = 1;       // tell bt_loadpage to go right to us
+       right->page->kill = 1;
 
-       bt_unlockpage(BtLockDelete, rset);
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       bt_unlockpage (BtLockWrite, right->latch);
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+       //      remove our slot from our parent
+       //      signal to move right
 
-       // delete our obsolete fence key from our parent
+       parent->page->goright = 1;      // tell bt_loadpage to go right to rparent
+       parent->page->kill = 1;
+       parent->page->act--;
 
-       slotptr(ppage, slot)->dead = 1;
-       ppage->dirty = 1;
+       //      redirect right page pointer in right parent to us
 
-       //      if our parent now empty
-       //      remove it from the tree
+       for( idx = 0; idx++ < rparent->page->cnt; )
+         if( !slotptr(rparent->page, idx)->dead )
+               break;
 
-       if( ppage->act-- == 1 )
-         if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
-               return bt->err;
+       if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
+               return bt->err = BTERR_struct;
 
-rmergexit:
-       bt_unlockpage (BtLockWrite, pset);
-       bt_unpinlatch (pset);
-       bt_unpinpool (ppool);
+       bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
+       bt_unlockpage (BtLockWrite, rparent->latch);
+       bt_unpinlatch (rparent->latch);
+       bt_unpinpool (rparent->pool);
 
-       bt->found = 1;
-       return bt->err = 0;
-}
+       //      free the right page
+
+       bt_lockpage (BtLockDelete, right->latch);
+       bt_lockpage (BtLockWrite, right->latch);
+       bt_freepage (bt, right);
 
-//  remove empty page from the B-tree
-//     try merging left first.  If no left
-//     sibling, then merge right.
+       //      save parent page fence value
 
-//     call with page loaded and locked,
-//  return with page locked.
+       memcpy (pagefence, parent->page->fence, 256);
+       bt_unlockpage (BtLockWrite, parent->latch);
+
+       return bt_removepage (bt, parent, lvl, pagefence);
+}
 
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+//     remove page from btree
+//     call with page unlocked
+//     returns with page on free list
+
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence)
 {
-unsigned char fencekey[256], postkey[256];
-uint slot, idx, postfence = 0;
-BtLatchSet *lset, *pset;
-BtPool *lpool, *ppool;
-BtPage lpage, ppage;
-uid left, parent;
+BtPageSet parent[1], sibling[1], rparent[1];
+unsigned char newfence[256];
+uint slot, idx;
 BtKey ptr;
 
-       ptr = keyptr(page, page->cnt);
-       memcpy(fencekey, ptr, ptr->len + 1);
-       bt_unlockpage (BtLockWrite, set);
-
        //      load and lock our parent
 
 retry:
-       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
+       if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
-       parent = bt->page_no;
-       ppage = bt->page;
-       ppool = bt->pool;
-       pset = bt->set;
+       //      can we do a simple merge entirely
+       //      between siblings on the parent page?
 
-       //      wait until we are posted in our parent
+       if( slot < parent->page->cnt ) {
+               // find our right neighbor
+               //      right must exist because the stopper prevents
+               //      the rightmost page from deleting
 
-       if( !bt->parent ) {
-               bt_unlockpage (BtLockWrite, pset);
-               bt_unpinlatch (pset);
-               bt_unpinpool (ppool);
-#ifdef unix
+               for( idx = slot; idx++ < parent->page->cnt; )
+                 if( !slotptr(parent->page, idx)->dead )
+                       break;
+
+               sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
+
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+
+               //      merge right if sibling shows up in
+               //  our parent and is not being killed
+
+               if( sibling->page_no == bt_getid (set->page->right) ) {
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
+                 bt_lockpage (BtLockParent, sibling->latch);
+                 bt_lockpage (BtLockDelete, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_mergeright(bt, set, parent, sibling, slot, idx);
+
+                 //  try again later
+
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockParent, sibling->latch);
+                 bt_unlockpage (BtLockDelete, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
+
+               bt_unlockpage (BtLockDelete, set->latch);
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
                sched_yield();
 #else
                SwitchToThread();
@@ -1812,35 +1909,59 @@ retry:
        //  find our left neighbor in our parent page
 
        for( idx = slot; --idx; )
-         if( !slotptr(ppage, idx)->dead )
+         if( !slotptr(parent->page, idx)->dead )
                break;
 
-       //      if no left neighbor, do right merge
+       //      if no left neighbor, delete ourselves and our parent
 
-       if( !idx )
-               return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
+       if( !idx ) {
+               bt_lockpage (BtLockAccess, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockAccess, set->latch);
 
-       // lock and map our left neighbor's page
+               rparent->page_no = bt_getid (parent->page->right);
+               rparent->latch = bt_pinlatch (bt, rparent->page_no);
 
-       left = bt_getid (slotptr(ppage, idx)->id);
+               bt_lockpage (BtLockAccess, rparent->latch);
+               bt_lockpage (BtLockWrite, rparent->latch);
+               bt_unlockpage (BtLockAccess, rparent->latch);
 
-       if( lpool = bt_pinpool (bt, left) )
-               lpage = bt_page (bt, lpool, left);
-       else
-               return bt->err;
+               if( rparent->pool = bt_pinpool (bt, rparent->page_no) )
+                       rparent->page = bt_page (bt, rparent->pool, rparent->page_no);
+               else
+                       return bt->err;
+
+               if( !rparent->page->kill ) {
+                 sibling->page_no = bt_getid (set->page->right);
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
+
+                 bt_lockpage (BtLockAccess, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockAccess, sibling->latch);
+
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
+
+                 //  try again later
 
-       lset = bt_pinlatch (bt, left);
-       bt_lockpage(BtLockWrite, lset);
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
 
-       //  wait until sibling is in our parent
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, rparent->latch);
+               bt_unpinlatch (rparent->latch);
+               bt_unpinpool (rparent->pool);
 
-       if( bt_getid (lpage->right) != page_no ) {
-               bt_unlockpage (BtLockWrite, pset);
-               bt_unpinlatch (pset);
-               bt_unpinpool (ppool);
-               bt_unlockpage (BtLockWrite, lset);
-               bt_unpinlatch (lset);
-               bt_unpinpool (lpool);
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
 #ifdef linux
                sched_yield();
 #else
@@ -1849,114 +1970,97 @@ retry:
                goto retry;
        }
 
-       //  since our page will have no more pointers to it,
-       //      obtain Delete lock and wait for write locks to clear
-
-       bt_lockpage(BtLockDelete, set);
-       bt_lockpage(BtLockWrite, set);
+       // redirect parent to our left sibling
+       // lock and map our left sibling's page
 
-       //      if we aren't dead yet,
-       //      get ready for exit
-
-       if( page->act ) {
-               bt_unlockpage(BtLockDelete, set);
-               bt_unlockpage(BtLockWrite, lset);
-               bt_unpinlatch (lset);
-               bt_unpinpool (lpool);
-               goto lmergexit;
-       }
+       sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
+       sibling->latch = bt_pinlatch (bt, sibling->page_no);
 
-       //      are we are the fence key for our parent?
-       //      if so, grab our old fence key
+       //      wait our turn on fence key maintenance
 
-       if( postfence = slot == ppage->cnt ) {
-               ptr = keyptr (ppage, ppage->cnt);
-               memcpy(fencekey, ptr, ptr->len + 1);
-               memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
+       bt_lockpage(BtLockParent, sibling->latch);
+       bt_lockpage(BtLockAccess, sibling->latch);
+       bt_lockpage(BtLockWrite, sibling->latch);
+       bt_unlockpage(BtLockAccess, sibling->latch);
 
-               // clear out other dead slots
-
-               while( --ppage->cnt )
-                 if( slotptr(ppage, ppage->cnt)->dead )
-                       memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
-                 else
-                       break;
-
-               ptr = keyptr (ppage, ppage->cnt);
-               memcpy(postkey, ptr, ptr->len + 1);
-       } else
-               slotptr(ppage,slot)->dead = 1;
-
-       ppage->dirty = 1;
-       ppage->act--;
-
-       //      push our right neighbor pointer to our left
-
-       memcpy (lpage->right, page->right, BtId);
+       if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+       else
+               return bt->err;
 
-       //      add ourselves to free chain
-       //      lock latch mgr
+       //  wait until left sibling is in our parent
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock);
+       if( bt_getid (sibling->page->right) != set->page_no ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unlockpage (BtLockWrite, sibling->latch);
+               bt_unlockpage (BtLockParent, sibling->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+               bt_unpinlatch (sibling->latch);
+               bt_unpinpool (sibling->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
+       }
 
-       //      store free chain in allocation page second right
-       bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
+       //      delete our left sibling from parent
 
-       // unlock latch mgr and pages
+       slotptr(parent->page,idx)->dead = 1;
+       parent->page->dirty = 1;
+       parent->page->act--;
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
-       bt_unlockpage(BtLockWrite, lset);
-       bt_unpinlatch (lset);
-       bt_unpinpool (lpool);
+       //      redirect our parent slot to our left sibling
 
-       // release our node's delete lock
+       bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
+       memcpy (sibling->page->right, set->page->right, BtId);
 
-       bt_unlockpage(BtLockDelete, set);
+       //      collapse dead slots from parent
 
-lmergexit:
-       bt_unlockpage (BtLockWrite, pset);
-       bt_unpinpool (ppool);
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
+                 break;
 
-       //  do we need to post parent's fence key in its parent?
+       //  free our original page
 
-       if( !postfence || parent == ROOT_page ) {
-               bt_unpinlatch (pset);
-               bt->found = 1;
-               return bt->err = 0;
-       }
+       bt_lockpage (BtLockDelete, set->latch);
+       bt_lockpage (BtLockWrite, set->latch);
+       bt_freepage (bt, set);
 
-       //      interlock parent fence post
+       //      go down the left node's fence keys to the leaf level
+       //      and update the fence keys in each page
 
-       bt_lockpage (BtLockParent, pset);
+       memcpy (newfence, parent->page->fence, 256);
 
-       //      load parent's parent page
-posttry:
-       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+       if( bt_fixfences (bt, sibling, newfence) )
                return bt->err;
 
-       if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
-         if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
-               return bt->err;
-         else
-               goto posttry;
+       //  promote sibling as new root?
 
-       page = bt->page;
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+        if( sibling->page->lvl ) {
+         sibling->latch = bt_pinlatch (bt, sibling->page_no);
+         bt_lockpage (BtLockDelete, sibling->latch);
+         bt_lockpage (BtLockWrite, sibling->latch);
 
-       page->min -= *postkey + 1;
-       ((unsigned char *)page)[page->min] = *postkey;
-       memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
-       slotptr(page, slot)->off = page->min;
+         if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+         else
+               return bt->err;
 
-       bt_unlockpage (BtLockParent, pset);
-       bt_unpinlatch (pset);
+         return bt_removeroot (bt, parent, sibling);
+        }
 
-       bt_unlockpage (BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
 
-       bt->found = 1;
-       return bt->err = 0;
+       return 0;
 }
 
 //  find and delete key on page by marking delete flag bit
@@ -1964,65 +2068,79 @@ posttry:
 
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
 {
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
-uid page_no;
+unsigned char pagefence[256];
+uint slot, idx, found;
+BtPageSet set[1];
 BtKey ptr;
-uint slot;
 
-       if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
+               ptr = keyptr(set->page, slot);
+       else
                return bt->err;
 
-       page_no = bt->page_no;
-       page = bt->page;
-       pool = bt->pool;
-       set = bt->set;
-
        // if key is found delete it, otherwise ignore request
 
-       ptr = keyptr(page, slot);
+       if( found = slot <= set->page->cnt )
+         if( found = !keycmp (ptr, key, len) )
+               if( found = slotptr(set->page, slot)->dead == 0 ) {
+                 slotptr(set->page,slot)->dead = 1;
+                 set->page->dirty = 1;
+                 set->page->act--;
+
+                 // collapse empty slots
 
-       if( bt->found = !keycmp (ptr, key, len) )
-         if( bt->found = slotptr(page, slot)->dead == 0 ) {
-               slotptr(page,slot)->dead = 1;
-                 if( slot < page->cnt )
-                       page->dirty = 1;
-                 if( !--page->act )
-                       if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
-                         return bt->err;
+                 while( idx = set->page->cnt - 1 )
+                       if( slotptr(set->page, idx)->dead ) {
+                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
+                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
+                       } else
+                               break;
                }
 
-       bt_unlockpage(BtLockWrite, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
-       return bt->err = 0;
+       if( set->page->act ) {
+               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               return bt->found = found, 0;
+       }
+
+       memcpy (pagefence, set->page->fence, 256);
+       set->page->kill = 1;
+
+       bt_unlockpage (BtLockWrite, set->latch);
+
+       if( bt_removepage (bt, set, 0, pagefence) )
+               return bt->err;
+
+       bt->found = found;
+       return 0;
 }
 
 //     find key in leaf level and return row-id
 
 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint  slot;
 BtKey ptr;
 uid id;
 
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               ptr = keyptr(bt->page, slot);
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+               ptr = keyptr(set->page, slot);
        else
                return 0;
 
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
-               id = bt_getid(slotptr(bt->page,slot)->id);
+       if( !keycmp (ptr, key, len) )
+               id = bt_getid(slotptr(set->page,slot)->id);
        else
                id = 0;
 
-       bt_unlockpage (BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage (BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return id;
 }
 
@@ -2033,7 +2151,7 @@ uid id;
 
 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
-uint nxt = bt->mgr->page_size;
+uint nxt = bt->mgr->page_size, off;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
 uint newslot = max;
@@ -2056,28 +2174,31 @@ BtKey key;
        page->act = 0;
 
        // try cleaning up page first
-
-       // always leave fence key in the array
-       // otherwise, remove deleted key
+       // by removing deleted keys
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
-               // copy key
+               // if its not the fence key,
+               // copy the key across
 
-               key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               off = slotptr(bt->frame,cnt)->off;
+
+               if( off >= sizeof(*page) ) {
+                       key = keyptr(bt->frame, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               }
 
                // copy slot
+
                memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
-               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
-                       page->act++;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
+               slotptr(page, idx)->off = off;
+               page->act++;
        }
 
        page->min = nxt;
@@ -2091,202 +2212,192 @@ BtKey key;
        return 0;
 }
 
-//     add key to current page
-//     page must already be writelocked
+// split the root and raise the height of the btree
 
-void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
-{
-uint idx;
-
-       // find next available dead slot and copy key onto page
-
-       for( idx = slot; idx < page->cnt; idx++ )
-         if( slotptr(page, idx)->dead )
-               break;
-
-       if( idx == page->cnt )
-               idx++, page->cnt++;
-
-       page->act++;
-
-       // now insert key into array before slot
-
-       while( idx > slot )
-               *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
-
-       page->min -= len + 1;
-       ((unsigned char *)page)[page->min] = len;
-       memcpy ((unsigned char *)page + page->min +1, key, len );
-
-       bt_putid(slotptr(page,slot)->id, id);
-       slotptr(page, slot)->off = page->min;
-       slotptr(page, slot)->tod = tod;
-       slotptr(page, slot)->dead = 0;
-}
-
-BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
+BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
 {
 uint nxt = bt->mgr->page_size;
-BtPage root = bt->page;
+unsigned char leftkey[256];
 uid new_page;
 
        //  Obtain an empty page to use, and copy the current
-       //  root contents into it
+       //  root contents into it, e.g. lower keys
 
-       if( !(new_page = bt_newpage(bt, root)) )
+       memcpy (leftkey, root->page->fence, 256);
+       root->page->posted = 1;
+
+       if( !(new_page = bt_newpage(bt, root->page)) )
                return bt->err;
 
        // preserve the page info at the bottom
-       // and set rest to zero
+       // of higher keys and set rest to zero
 
-       memset(root+1, 0, bt->mgr->page_size - sizeof(*root));
+       memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
+       memset(root->page->fence, 0, 256);
+       root->page->fence[0] = 2;
+       root->page->fence[1] = 0xff;
+       root->page->fence[2] = 0xff;
 
-       // insert first key on newroot page
+       // insert lower keys page fence key on newroot page
 
        nxt -= *leftkey + 1;
-       memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
-       bt_putid(slotptr(root, 1)->id, new_page);
-       slotptr(root, 1)->off = nxt;
+       memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
+       bt_putid(slotptr(root->page, 1)->id, new_page);
+       slotptr(root->page, 1)->off = nxt;
        
-       // insert second key (stopper key) on newroot page
+       // insert stopper key on newroot page
        // and increase the root height
 
-       nxt -= 3;
-       *((unsigned char *)root + nxt) = 2;
-       memset ((unsigned char *)root + nxt + 1, 0xff, 2);
-       bt_putid(slotptr(root, 2)->id, page_no2);
-       slotptr(root, 2)->off = nxt;
+       bt_putid(slotptr(root->page, 2)->id, page_no2);
+       slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
 
-       bt_putid(root->right, 0);
-       root->min = nxt;                // reset lowest used offset and key count
-       root->cnt = 2;
-       root->act = 2;
-       root->lvl++;
+       bt_putid(root->page->right, 0);
+       root->page->min = nxt;          // reset lowest used offset and key count
+       root->page->cnt = 2;
+       root->page->act = 2;
+       root->page->lvl++;
 
-       // release and unpin root (bt->page)
+       // release and unpin root
 
-       bt_unlockpage(BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage(BtLockWrite, root->latch);
+       bt_unpinlatch (root->latch);
+       bt_unpinpool (root->pool);
        return 0;
 }
 
 //  split already locked full node
-//     return unlocked and unpinned.
+//     return unlocked.
 
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
+BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
-uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
-unsigned char rightkey[256], leftkey[256];
-uint tod = time(NULL);
-uint lvl = page->lvl;
-uid new_page;
+uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size, off;
+unsigned char fencekey[256];
+uint lvl = set->page->lvl;
+uid right;
 BtKey key;
 
-       //      initialize frame buffer for right node
+       //  split higher half of keys to bt->frame
 
        memset (bt->frame, 0, bt->mgr->page_size);
-       max = page->cnt;
+       max = set->page->cnt;
        cnt = max / 2;
        idx = 0;
 
-       //  split higher half of keys to bt->frame
-
        while( cnt++ < max ) {
-               key = keyptr(page, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
-               memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
-               if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
-                       bt->frame->act++;
-               slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
-               slotptr(bt->frame, idx)->off = nxt;
+               if( !lvl || cnt < max ) {
+                       key = keyptr(set->page, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
+               } else
+                       off = offsetof(struct BtPage_, fence);
+
+               memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
+               slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
+               slotptr(bt->frame, idx)->off = off;
+               bt->frame->act++;
        }
 
-       // transfer right link node to new right node
-
-       if( page_no > ROOT_page )
-               memcpy (bt->frame->right, page->right, BtId);
+       if( set->page_no == ROOT_page )
+               bt->frame->posted = 1;
 
+       memcpy (bt->frame->fence, set->page->fence, 256);
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
        bt->frame->cnt = idx;
        bt->frame->lvl = lvl;
 
-       //      get new free page and write right frame to it.
+       // link right node
 
-       if( !(new_page = bt_newpage(bt, bt->frame)) )
-               return bt->err;
+       if( set->page_no > ROOT_page )
+               memcpy (bt->frame->right, set->page->right, BtId);
 
-       //      remember fence key for new right page to add
-       //      as right sibling to the left node
+       //      get new free page and write higher keys to it.
 
-       key = keyptr(bt->frame, idx);
-       memcpy (rightkey, key, key->len + 1);
+       if( !(right = bt_newpage(bt, bt->frame)) )
+               return bt->err;
 
        //      update lower keys to continue in old page
 
-       memcpy (bt->frame, page, bt->mgr->page_size);
-       memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
+       memcpy (bt->frame, set->page, bt->mgr->page_size);
+       memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
        nxt = bt->mgr->page_size;
-       page->dirty = 0;
-       page->act = 0;
+       set->page->posted = 0;
+       set->page->dirty = 0;
+       set->page->act = 0;
        cnt = 0;
        idx = 0;
 
        //  assemble page of smaller keys
-       //      to remain in the old page
 
        while( cnt++ < max / 2 ) {
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
-               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
-                       page->act++;
-               slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
-       }
 
-       // finalize left page and save fence key
+               if( !lvl || cnt < max / 2 ) {
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
+               } else 
+                       off = offsetof(struct BtPage_, fence);
 
-       memcpy(leftkey, key, key->len + 1);
-       page->min = nxt;
-       page->cnt = idx;
+               memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
+               slotptr(set->page, idx)->off = off;
+               set->page->act++;
+       }
 
-       //      link new right page
+       // install fence key for smaller key page
 
-       bt_putid (page->right, new_page);
+       memset(set->page->fence, 0, 256);
+       memcpy(set->page->fence, key, key->len + 1);
+
+       bt_putid(set->page->right, right);
+       set->page->min = nxt;
+       set->page->cnt = idx;
 
        // if current page is the root page, split it
 
-       if( page_no == ROOT_page )
-               return bt_splitroot (bt, leftkey, new_page);
+       if( set->page_no == ROOT_page )
+               return bt_splitroot (bt, set, right);
+
+       bt_unlockpage (BtLockWrite, set->latch);
 
-       //  obtain ParentModification lock for current page
+       // insert new fences in their parent pages
+
+       while( 1 ) {
+               bt_lockpage (BtLockParent, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
 
-       bt_lockpage (BtLockParent, set);
+               memcpy (fencekey, set->page->fence, 256);
+               right = bt_getid (set->page->right);
 
-       //  release wr lock on our page.
-       //      this will keep out another SMO
+               if( set->page->posted ) {
+                       bt_unlockpage (BtLockParent, set->latch);
+                       bt_unlockpage (BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
+               }
 
-       bt_unlockpage (BtLockWrite, set);
+               set->page->posted = 1;
+               bt_unlockpage (BtLockWrite, set->latch);
 
-       //  insert key for old page (lower keys)
+               if( bt_insertkey (bt, fencekey+1, *fencekey, set->page_no, time(NULL), lvl+1) )
+                       return bt->err;
 
-       if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) )
-               return bt->err;
+               bt_unlockpage (BtLockParent, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
 
-       //      switch old parent key from us to our right page
+               if( !(set->page_no = right) )
+                       break;
 
-       if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) )
-               return bt->err;
+               set->latch = bt_pinlatch (bt, right);
 
-       //      unlock and unpin
+               if( set->pool = bt_pinpool (bt, right) )
+                       set->page = bt_page (bt, set->pool, right);
+               else
+                       return bt->err;
+       }
 
-       bt_unlockpage (BtLockParent, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
        return 0;
 }
 
@@ -2294,13 +2405,13 @@ BtKey key;
 
 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
 {
+BtPageSet set[1];
 uint slot, idx;
-BtPage page;
 BtKey ptr;
 
        while( 1 ) {
-               if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-                       ptr = keyptr(bt->page, slot);
+               if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
+                       ptr = keyptr(set->page, slot);
                else
                {
                        if ( !bt->err )
@@ -2310,34 +2421,56 @@ BtKey ptr;
 
                // if key already exists, update id and return
 
-               page = bt->page;
-
-               if( !keycmp (ptr, key, len) ) {
-                       if( slotptr(page, slot)->dead )
-                               page->act++;
-                       slotptr(page, slot)->dead = 0;
-                       slotptr(page, slot)->tod = tod;
-                       bt_putid(slotptr(page,slot)->id, id);
-                       bt_unlockpage(BtLockWrite, bt->set);
-                       bt_unpinlatch (bt->set);
-                       bt_unpinpool (bt->pool);
-                       return bt->err;
+               if( slot <= set->page->cnt )
+                 if( !keycmp (ptr, key, len) ) {
+                       if( slotptr(set->page, slot)->dead )
+                               set->page->act++;
+                       slotptr(set->page, slot)->dead = 0;
+                       slotptr(set->page, slot)->tod = tod;
+                       bt_putid(slotptr(set->page,slot)->id, id);
+                       bt_unlockpage(BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
                }
 
                // check if page has enough space
 
-               if( slot = bt_cleanpage (bt, bt->page, len, slot) )
+               if( slot = bt_cleanpage (bt, set->page, len, slot) )
                        break;
 
-               if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+               if( bt_splitpage (bt, set) )
                        return bt->err;
        }
 
-       bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
+       // calculate next available slot and copy key into page
+
+       set->page->min -= len + 1; // reset lowest used offset
+       ((unsigned char *)set->page)[set->page->min] = len;
+       memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
+
+       for( idx = slot; idx <= set->page->cnt; idx++ )
+         if( slotptr(set->page, idx)->dead )
+               break;
+
+       // now insert key into array before slot
 
-       bt_unlockpage (BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       if( idx > set->page->cnt )
+               set->page->cnt++;
+
+       set->page->act++;
+
+       while( idx > slot )
+               *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+
+       bt_putid(slotptr(set->page,slot)->id, id);
+       slotptr(set->page, slot)->off = set->page->min;
+       slotptr(set->page, slot)->tod = tod;
+       slotptr(set->page, slot)->dead = 0;
+
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return 0;
 }
 
@@ -2345,17 +2478,21 @@ BtKey ptr;
 
 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint slot;
 
        // cache page for retrieval
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               memcpy (bt->cursor, bt->page, bt->mgr->page_size);
 
-       bt->cursor_page = bt->page_no;
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+         memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       else
+         return 0;
+
+       bt->cursor_page = set->page_no;
 
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return slot;
 }
 
@@ -2364,9 +2501,7 @@ uint slot;
 
 uint bt_nextkey (BtDb *bt, uint slot)
 {
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
+BtPageSet set[1];
 uid right;
 
   do {
@@ -2374,7 +2509,7 @@ uid right;
        while( slot++ < bt->cursor->cnt )
          if( slotptr(bt->cursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt) )
+         else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
                return slot;
          else
                break;
@@ -2383,19 +2518,20 @@ uid right;
                break;
 
        bt->cursor_page = right;
-       if( pool = bt_pinpool (bt, right) )
-               page = bt_page (bt, pool, right);
+
+       if( set->pool = bt_pinpool (bt, right) )
+               set->page = bt_page (bt, set->pool, right);
        else
                return 0;
 
-       set = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, set);
+       set->latch = bt_pinlatch (bt, right);
+    bt_lockpage(BtLockRead, set->latch);
 
-       memcpy (bt->cursor, page, bt->mgr->page_size);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        slot = 0;
   } while( 1 );
 
@@ -2423,39 +2559,39 @@ uint bt_tod(BtDb *bt, uint slot)
 void bt_latchaudit (BtDb *bt)
 {
 ushort idx, hashidx;
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
-uid page_no;
+BtPageSet set[1];
 
 #ifdef unix
        for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
-               set = bt->mgr->latchsets + idx;
-               if( set->pin ) {
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-                       set->pin = 0;
+               set->latch = bt->mgr->latchsets + idx;
+               if( set->latch->pin ) {
+                       fprintf(stderr, "latchset %d pinned for page %.6x\n", idx, set->latch->page_no);
+                       set->latch->pin = 0;
                }
        }
 
        for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
-         if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
-               fprintf(stderr, "latchmgr locked\n");
          if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
-               set = bt->mgr->latchsets + idx;
-               if( set->hash != hashidx )
+               set->latch = bt->mgr->latchsets + idx;
+               if( set->latch->hash != hashidx )
                        fprintf(stderr, "latchset %d wrong hashidx\n", idx);
-               if( set->pin )
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-         } while( idx = set->next );
+               if( set->latch->pin )
+                       fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, set->latch->page_no);
+         } while( idx = set->latch->next );
        }
-       page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
-
-       while( page_no ) {
-               fprintf(stderr, "free: %.6x\n", (uint)page_no);
-               pool = bt_pinpool (bt, page_no);
-               page = bt_page (bt, pool, page_no);
-           page_no = bt_getid(page->right);
-               bt_unpinpool (pool);
+
+       set->page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
+
+       while( set->page_no ) {
+               fprintf(stderr, "free: %.6x\n", (uint)set->page_no);
+
+               if( set->pool = bt_pinpool (bt, set->page_no) )
+                       set->page = bt_page (bt, set->pool, set->page_no);
+               else
+                       return;
+
+           set->page_no = bt_getid(set->page->right);
+               bt_unpinpool (set->pool);
        }
 #endif
 }
@@ -2481,10 +2617,8 @@ uid next, page_no = LEAF_page;   // start on first page of leaves
 unsigned char key[256];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
-BtLatchSet *set;
+BtPageSet set[1];
 time_t tod[1];
-BtPool *pool;
-BtPage page;
 BtKey ptr;
 BtDb *bt;
 FILE *in;
@@ -2570,38 +2704,49 @@ FILE *in;
                break;
 
        case 's':
-               len = key[0] = 0;
-
-               fprintf(stderr, "started reading\n");
-
-               if( slot = bt_startkey (bt, key, len) )
-                 slot--;
-               else
-                 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
-
-               while( slot = bt_nextkey (bt, slot) ) {
-                       ptr = bt_key(bt, slot);
-                       fwrite (ptr->key, ptr->len, 1, stdout);
-                       fputc ('\n', stdout);
-               }
+               fprintf(stderr, "started scanning\n");
+               do {
+                       if( set->pool = bt_pinpool (bt, page_no) )
+                               set->page = bt_page (bt, set->pool, page_no);
+                       else
+                               break;
+                       set->latch = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set->latch);
+                       next = bt_getid (set->page->right);
+                       cnt += set->page->act;
+
+                       for( slot = 0; slot++ < set->page->cnt; )
+                        if( next || slot < set->page->cnt )
+                         if( !slotptr(set->page, slot)->dead ) {
+                               ptr = keyptr(set->page, slot);
+                               fwrite (ptr->key, ptr->len, 1, stdout);
+                               fputc ('\n', stdout);
+                         }
+
+                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+               } while( page_no = next );
 
+               cnt--;  // remove stopper key
+               fprintf(stderr, " Total keys read %d\n", cnt);
                break;
 
        case 'c':
-               fprintf(stderr, "started reading\n");
+               fprintf(stderr, "started counting\n");
 
                do {
-                       if( pool = bt_pinpool (bt, page_no) )
-                               page = bt_page (bt, pool, page_no);
+                       if( set->pool = bt_pinpool (bt, page_no) )
+                               set->page = bt_page (bt, set->pool, page_no);
                        else
                                break;
-                       set = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, set);
-                       cnt += page->act;
-                       next = bt_getid (page->right);
-                       bt_unlockpage (BtLockRead, set);
-                       bt_unpinlatch (set);
-                       bt_unpinpool (pool);
+                       set->latch = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set->latch);
+                       cnt += set->page->act;
+                       next = bt_getid (set->page->right);
+                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
                } while( page_no = next );
 
                cnt--;  // remove stopper key
index 73d7541c013d260c76edc2c917e4b0a4dde15fff..5f003793424653ad07bc9ee6bf743969f80c2a65 100644 (file)
@@ -1,5 +1,6 @@
 // btree version threads2i sched_yield version
-// 30 JAN 2014
+//     with reworked bt_deletekey code
+// 12 FEB 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -49,6 +50,7 @@ REDISTRIBUTION OF THIS SOFTWARE.
 
 #include <memory.h>
 #include <string.h>
+#include <stddef.h>
 
 typedef unsigned long long     uid;
 
@@ -117,7 +119,7 @@ typedef struct {
 typedef struct {
        BtSpinLatch readwr[1];          // read/write page lock
        BtSpinLatch access[1];          // Access Intent/Page delete
-       BtSpinLatch parent[1];          // adoption of foster children
+       BtSpinLatch parent[1];          // Posting of fence key in parent
        BtSpinLatch busy[1];            // slot is being moved between chains
        volatile ushort next;           // next entry in hash table chain
        volatile ushort prev;           // prev entry in hash table chain
@@ -161,14 +163,19 @@ typedef struct {
 //     It is immediately followed
 //     by the BtSlot array of keys.
 
-typedef struct Page {
+typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
-       unsigned char bits;                     // page size in bits
-       unsigned char lvl:7;            // level of page
+       unsigned char bits:7;           // page size in bits
+       unsigned char free:1;           // page is on free chain
+       unsigned char lvl:4;            // level of page
+       unsigned char kill:1;           // page is being deleted
        unsigned char dirty:1;          // page has deleted keys
+       unsigned char posted:1;         // page fence is posted
+       unsigned char goright:1;        // page is being deleted, go right
        unsigned char right[BtId];      // page number to right
+       unsigned char fence[256];       // page fence key
 } *BtPage;
 
 //     The memory mapping pool table buffer manager entry
@@ -186,10 +193,19 @@ typedef struct {
 #endif
 } BtPool;
 
+//  The loadpage interface object
+
+typedef struct {
+       uid page_no;            // current page number
+       BtPage page;            // current page pointer
+       BtPool *pool;           // current page pool
+       BtLatchSet *latch;      // current page latch set
+} BtPageSet;
+
 //     structure for latch manager on ALLOC_page
 
 typedef struct {
-       struct Page alloc[2];           // next & free page_nos in right ptr
+       struct BtPage_ alloc[2];        // next & free page_nos in right ptr
        BtSpinLatch lock[1];            // allocation area lite latch
        ushort latchdeployed;           // highest number of latch entries deployed
        ushort nlatchpage;                      // number of latch pages at BT_latch
@@ -231,13 +247,8 @@ typedef struct {
        BtPage cursor;          // cached frame for start/next (never mapped)
        BtPage frame;           // spare frame for the page split (never mapped)
        BtPage zero;            // page frame for zeroes at end of file
-       BtPage page;            // current page
-       uid page_no;            // current page number  
        uid cursor_page;        // current cursor page number   
-       BtLatchSet *set;        // current page latch set
-       BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
-       int parent;                     // last loadpage was from a parent level
        int found;                      // last delete or insert was found
        int err;                        // last error
 } BtDb;
@@ -262,9 +273,7 @@ extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
 //     internal functions
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
-uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence);
 
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
@@ -310,8 +319,8 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //     one with two keys.
 
 //     Deleted keys are marked with a dead bit until
-//     page cleanup The fence key for a node is always
-//     present, even after deletion and cleanup.
+//     page cleanup. The fence key for a node is
+//     present in a special array
 
 //  Groups of pages called segments from the btree are optionally
 //  cached with a memory mapped pool. A hash table is used to keep
@@ -326,12 +335,13 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //  Page 0 is dedicated to lock for new page extensions,
 //     and chains empty pages together for reuse.
 
-//     The ParentModification lock on a node is obtained to prevent resplitting
-//     or deleting a node before its fence is posted into its upper level.
+//     The ParentModification lock on a node is obtained to serialize posting
+//     or changing the fence key for a node.
 
 //     Empty pages are chained together through the ALLOC page and reused.
 
 //     Access macros to address slot and key values from the page
+//     Page slots use 1 based indexing.
 
 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
@@ -872,13 +882,12 @@ SYSTEM_INFO sysinfo[1];
        latchmgr->alloc->bits = mgr->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
-               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
+               slotptr(latchmgr->alloc, 1)->off = offsetof(struct BtPage_, fence);
                bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
-               key = keyptr(latchmgr->alloc, 1);
-               key->len = 2;                   // create stopper key
-               key->key[0] = 0xff;
-               key->key[1] = 0xff;
-               latchmgr->alloc->min = mgr->page_size - 3;
+               latchmgr->alloc->fence[0] = 2;                  // create stopper key
+               latchmgr->alloc->fence[1] = 0xff;
+               latchmgr->alloc->fence[2] = 0xff;
+               latchmgr->alloc->min = mgr->page_size;
                latchmgr->alloc->lvl = lvl;
                latchmgr->alloc->cnt = 1;
                latchmgr->alloc->act = 1;
@@ -1071,7 +1080,7 @@ int flag;
 
 #ifdef unix
        flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
-       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
+       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED | MAP_POPULATE, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
 
@@ -1304,10 +1313,8 @@ void bt_unlockpage(BtLock mode, BtLatchSet *set)
 
 uid bt_newpage(BtDb *bt, BtPage page)
 {
-BtLatchSet *set;
-BtPool *pool;
+BtPageSet set[1];
 uid new_page;
-BtPage pmap;
 int reuse;
 
        //      lock allocation page
@@ -1318,12 +1325,13 @@ int reuse;
        // else allocate empty page
 
        if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
-               if( pool = bt_pinpool (bt, new_page) )
-                       pmap = bt_page (bt, pool, new_page);
+               if( set->pool = bt_pinpool (bt, new_page) )
+                       set->page = bt_page (bt, set->pool, new_page);
                else
                        return 0;
-               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
-               bt_unpinpool (pool);
+
+               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
+               bt_unpinpool (set->pool);
                reuse = 1;
        } else {
                new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
@@ -1339,7 +1347,6 @@ int reuse;
        if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
        {
                // use zero buffer to write zeros
-               memset(bt->zero, 0, bt->mgr->page_size);
                if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
                        return bt->err = BTERR_wrt, 0;
        }
@@ -1347,13 +1354,13 @@ int reuse;
        //      bring new page into pool and copy page.
        //      this will extend the file into the new pages.
 
-       if( pool = bt_pinpool (bt, new_page) )
-               pmap = bt_page (bt, pool, new_page);
+       if( set->pool = bt_pinpool (bt, new_page) )
+               set->page = bt_page (bt, set->pool, new_page);
        else
                return 0;
 
-       memcpy(pmap, page, bt->mgr->page_size);
-       bt_unpinpool (pool);
+       memcpy(set->page, page, bt->mgr->page_size);
+       bt_unpinpool (set->pool);
 #endif
        // unlock allocation latch and return new page no
 
@@ -1363,142 +1370,161 @@ int reuse;
 
 //  find slot in page for given key at a given level
 
-int bt_findslot (BtDb *bt, unsigned char *key, uint len)
+int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
 {
-uint diff, higher = bt->page->cnt, low = 1, slot;
-uint good = 0;
+uint diff, higher = set->page->cnt, low = 1, slot;
 
-       //      if no right link
        //        make stopper key an infinite fence value
-       //        by setting the good flag
 
-       if( bt_getid (bt->page->right) )
+       if( bt_getid (set->page->right) )
                higher++;
-       else
-               good++;
 
-       //      low is the next candidate.
+       //      low is the lowest candidate.
        //  loop ends when they meet
 
-       //  if good, higher is already
+       //  higher is already
        //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
-               if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
+               if( keycmp (keyptr(set->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot, good++;
+                       higher = slot;
        }
 
+       if( higher <= set->page->cnt )
+               return higher;
+
+       //      if leaf page, compare against fence value
+
        //      return zero if key is on right link page
+       //      or return slot beyond last key
+
+       if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
+               return 0;
 
-       return good ? higher : 0;
+       return higher;
 }
 
 //  find and load page at given level for given key
 //     leave page rd or wr locked as requested
 
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
+int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
 {
 uid page_no = ROOT_page, prevpage = 0;
-BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
+BtLatchSet *prevlatch;
 uint mode, prevmode;
 BtPool *prevpool;
-int parent = 1;
 
   //  start at root of btree and drill down
 
-  bt->set = NULL;
-
   do {
        // determine lock mode of drill level
        mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
 
-       bt->set = bt_pinlatch (bt, page_no);
-       bt->page_no = page_no;
+       set->latch = bt_pinlatch (bt, page_no);
+       set->page_no = page_no;
 
        // pin page contents
 
-       if( bt->pool = bt_pinpool (bt, page_no) )
-               bt->page = bt_page (bt, bt->pool, page_no);
+       if( set->pool = bt_pinpool (bt, page_no) )
+               set->page = bt_page (bt, set->pool, page_no);
        else
                return 0;
 
        // obtain access lock using lock chaining with Access mode
 
        if( page_no > ROOT_page )
-         bt_lockpage(BtLockAccess, bt->set);
+         bt_lockpage(BtLockAccess, set->latch);
 
        //      release & unpin parent page
 
        if( prevpage ) {
-         bt_unlockpage(prevmode, prevset);
-         bt_unpinlatch (prevset);
+         bt_unlockpage(prevmode, prevlatch);
+         bt_unpinlatch (prevlatch);
          bt_unpinpool (prevpool);
          prevpage = 0;
        }
 
        // obtain read lock using lock chaining
 
-       bt_lockpage(mode, bt->set);
+       bt_lockpage(mode, set->latch);
 
        if( page_no > ROOT_page )
-         bt_unlockpage(BtLockAccess, bt->set);
+         bt_unlockpage(BtLockAccess, set->latch);
 
        // re-read and re-lock root after determining actual level of root
 
-       if( bt->page->lvl != drill) {
-               if ( bt->page_no != ROOT_page )
+       if( set->page->lvl != drill) {
+               if ( set->page_no != ROOT_page )
                        return bt->err = BTERR_struct, 0;
                        
-               drill = bt->page->lvl;
+               drill = set->page->lvl;
 
                if( lock == BtLockWrite && drill == lvl ) {
-                 bt_unlockpage(mode, bt->set);
-                 bt_unpinlatch (bt->set);
-                 bt_unpinpool (bt->pool);
+                 bt_unlockpage(mode, set->latch);
+                 bt_unpinlatch (set->latch);
+                 bt_unpinpool (set->pool);
                  continue;
                }
        }
 
+       prevpage = set->page_no;
+       prevlatch = set->latch;
+       prevpool = set->pool;
+       prevmode = mode;
+
+       //      if page is being deleted and we should continue right
+
+       if( set->page->kill && set->page->goright ) {
+               page_no = bt_getid (set->page->right);
+               continue;
+       }
+
+       //      otherwise, wait for deleted node to clear
+
+       if( set->page->kill ) {
+               bt_unlockpage(mode, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               page_no = ROOT_page;
+               prevpage = 0;
+               drill = 0xff;
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+       
        //  find key on page at this level
        //  and descend to requested level
 
-       if( slot = bt_findslot (bt, key, len) ) {
+       if( slot = bt_findslot (set, key, len) ) {
          if( drill == lvl )
-               return bt->parent = parent, slot;
+               return slot;
+
+         if( slot > set->page->cnt )
+               return bt->err = BTERR_struct;
 
-         while( slotptr(bt->page, slot)->dead )
-               if( slot++ < bt->page->cnt )
+         while( slotptr(set->page, slot)->dead )
+               if( slot++ < set->page->cnt )
                        continue;
-               else {
-                       page_no = bt_getid(bt->page->right);
-                       parent = 0;
-                       goto slideright;
-               }
+               else
+                       return bt->err = BTERR_struct, 0;
 
-         page_no = bt_getid(slotptr(bt->page, slot)->id);
-         parent = 1;
+         page_no = bt_getid(slotptr(set->page, slot)->id);
          drill--;
+         continue;
        }
 
        //  or slide right into next page
 
-       else {
-               page_no = bt_getid(bt->page->right);
-               parent = 0;
-       }
-
-       //  continue down / right using overlapping locks
-       //  to protect pages being split.
+       page_no = bt_getid(set->page->right);
 
-slideright:
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
   } while( page_no );
 
   // return error on end of right chain
@@ -1507,189 +1533,287 @@ slideright:
   return 0;    // return error
 }
 
-//  remove empty page from the B-tree
-//     by pulling our right node left over ourselves
+// drill down fixing fence values for left sibling tree
 
-//  call with bt->page, etc, set to page's locked parent
-//     returns with page locked.
+//  call with set write locked
+//     return with set unlocked & unpinned.
 
-BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
+BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
 {
-BtLatchSet *rset, *pset, *rpset;
-BtPool *rpool, *ppool, *rppool;
-BtPage rpage, ppage, rppage;
-uid right, parent, rparent;
-BtKey ptr;
-uint idx;
+unsigned char oldfence[256];
+BtPageSet next[1];
+int chk;
+
+  memcpy (oldfence, set->page->fence, 256);
+
+  while( !set->page->kill && set->page->lvl ) {
+       next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
+       next->latch = bt_pinlatch (bt, next->page_no);
+       bt_lockpage (BtLockParent, next->latch);
+       bt_lockpage (BtLockAccess, next->latch);
+       bt_lockpage (BtLockWrite, next->latch);
+       bt_unlockpage (BtLockAccess, next->latch);
+
+       if( next->pool = bt_pinpool (bt, next->page_no) )
+               next->page = bt_page (bt, next->pool, next->page_no);
+       else
+               return bt->err;
 
-       //      cache node's parent page
+       chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
 
-       parent = bt->page_no;
-       ppage = bt->page;
-       ppool = bt->pool;
-       pset = bt->set;
+       if( chk < 0 ) {
+         next->page_no = bt_getid (next->page->right);
+         bt_unlockpage (BtLockWrite, next->latch);
+         bt_unlockpage (BtLockParent, next->latch);
+         bt_unpinlatch (next->latch);
+         bt_unpinpool (next->pool);
+         continue;
+       }
 
-       // lock and map our right page
-       //      it cannot be NULL because of the stopper
-       //      in the last right page
+       if( chk > 0 )
+               return bt->err = BTERR_struct;
 
-       bt_lockpage (BtLockWrite, set);
+       if( bt_fixfences (bt, next, newfence) )
+               return bt->err;
 
-       // if we aren't dead yet
+       break;
+  }
 
-       if( page->act )
-               goto rmergexit;
+  memcpy (set->page->fence, newfence, 256);
 
-       if( right = bt_getid (page->right) )
-         if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
-         else
-               return bt->err;
-       else
-               return bt->err = BTERR_struct;
+  bt_unlockpage (BtLockWrite, set->latch);
+  bt_unlockpage (BtLockParent, set->latch);
+  bt_unpinlatch (set->latch);
+  bt_unpinpool (set->pool);
+  return 0;
+}
 
-       rset = bt_pinlatch (bt, right);
+//     return page to free list
+//     page must be delete & write locked
 
-       //      find our right neighbor
+void bt_freepage (BtDb *bt, BtPageSet *set)
+{
+       //      lock allocation page
 
-       if( ppage->act > 1 ) {
-        for( idx = slot; idx++ < ppage->cnt; )
-         if( !slotptr(ppage, idx)->dead )
-               break;
+       bt_spinwritelock (bt->mgr->latchmgr->lock);
 
-        if( idx > ppage->cnt )
-               return bt->err = BTERR_struct;
+       //      store chain in second right
+       bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
+       set->page->free = 1;
 
-        //  redirect right neighbor in parent to left node
+       // unlock released page
 
-        bt_putid(slotptr(ppage,idx)->id, page_no);
-       }
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
 
-       //      if parent has only our deleted page, e.g. no right neighbor
-       //      prepare to merge parent itself
+       // unlock allocation page
 
-       if( ppage->act == 1 ) {
-         if( rparent = bt_getid (ppage->right) )
-          if( rppool = bt_pinpool (bt, rparent) )
-               rppage = bt_page (bt, rppool, rparent);
-          else
-               return bt->err;
+       bt_spinreleasewrite (bt->mgr->latchmgr->lock);
+}
+
+//     remove the root level by promoting its only child
+//     call with parent and child pages
+
+BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
+{
+uid next = 0;
+
+  do {
+       if( next ) {
+         child->latch = bt_pinlatch (bt, next);
+         bt_lockpage (BtLockDelete, child->latch);
+         bt_lockpage (BtLockWrite, child->latch);
+
+         if( child->pool = bt_pinpool (bt, next) )
+               child->page = bt_page (bt, child->pool, next);
          else
-               return bt->err = BTERR_struct;
+               return bt->err;
+
+         child->page_no = next;
+       }
+
+       memcpy (root->page, child->page, bt->mgr->page_size);
+       next = bt_getid (slotptr(child->page, child->page->cnt)->id);
+       bt_freepage (bt, child);
+  } while( root->page->lvl > 1 && root->page->cnt == 1 );
+
+  bt_unlockpage (BtLockWrite, root->latch);
+  bt_unpinlatch (root->latch);
+  bt_unpinpool (root->pool);
+  return 0;
+}
+
+//  pull right page over ourselves in simple merge
+
+BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
+{
+       //  install ourselves as child page
+       //      and delete ourselves from parent
 
-         rpset = bt_pinlatch (bt, rparent);
-         bt_lockpage (BtLockWrite, rpset);
+       bt_putid (slotptr(parent->page, idx)->id, set->page_no);
+       slotptr(parent->page, slot)->dead = 1;
+       parent->page->act--;
 
-         // find our right neighbor on right parent page
+       //      collapse any empty slots
 
-         for( idx = 0; idx++ < rppage->cnt; )
-               if( !slotptr(rppage, idx)->dead ) {
-                 bt_putid (slotptr(rppage, idx)->id, page_no);
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
                  break;
-               }
 
-         if( idx > rppage->cnt )
-               return bt->err = BTERR_struct;
-       }
+       memcpy (set->page, right->page, bt->mgr->page_size);
+       bt_unlockpage (BtLockParent, right->latch);
 
-       //      now that there are no more pointers to our right node
-       //      we can wait for delete lock on it
+       bt_freepage (bt, right);
 
-       bt_lockpage(BtLockDelete, rset);
-       bt_lockpage(BtLockWrite, rset);
+       //      do we need to remove a btree level?
+       //      (leave the first page of leaves alone)
 
-       // pull contents of right page into our empty page 
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+         if( set->page->lvl )
+               return bt_removeroot (bt, parent, set);
 
-       memcpy (page, rpage, bt->mgr->page_size);
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
+}
 
-       // ready to release right parent lock
-       //      now that we have a new page in place
+//     remove both child and parent from the btree
+//     from the fence position in the parent
+//     call with both pages locked for writing
 
-       if( ppage->act == 1 ) {
-         bt_unlockpage (BtLockWrite, rpset);
-         bt_unpinlatch (rpset);
-         bt_unpinpool (rppool);
-       }
+BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl)
+{
+unsigned char pagefence[256];
+uint idx;
 
-       //      add killed right block to free chain
-       //      lock latch mgr
+       //  pull right sibling over ourselves and unlock
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock);
+       memcpy (child->page, right->page, bt->mgr->page_size);
 
-       //      store free chain in allocation page second right
+       bt_unlockpage (BtLockWrite, child->latch);
+       bt_unpinlatch (child->latch);
+       bt_unpinpool (child->pool);
 
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+       //  install ourselves into right link of old right page
 
-       // unlock latch mgr and right page
+       bt_putid (right->page->right, child->page_no);
+       right->page->goright = 1;       // tell bt_loadpage to go right to us
+       right->page->kill = 1;
 
-       bt_unlockpage(BtLockDelete, rset);
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       bt_unlockpage (BtLockWrite, right->latch);
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+       //      remove our slot from our parent
+       //      signal to move right
 
-       // delete our obsolete fence key from our parent
+       parent->page->goright = 1;      // tell bt_loadpage to go right to rparent
+       parent->page->kill = 1;
+       parent->page->act--;
 
-       slotptr(ppage, slot)->dead = 1;
-       ppage->dirty = 1;
+       //      redirect right page pointer in right parent to us
 
-       //      if our parent now empty
-       //      remove it from the tree
+       for( idx = 0; idx++ < rparent->page->cnt; )
+         if( !slotptr(rparent->page, idx)->dead )
+               break;
 
-       if( ppage->act-- == 1 )
-         if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
-               return bt->err;
+       if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
+               return bt->err = BTERR_struct;
 
-rmergexit:
-       bt_unlockpage (BtLockWrite, pset);
-       bt_unpinlatch (pset);
-       bt_unpinpool (ppool);
+       bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
+       bt_unlockpage (BtLockWrite, rparent->latch);
+       bt_unpinlatch (rparent->latch);
+       bt_unpinpool (rparent->pool);
 
-       bt->found = 1;
-       return bt->err = 0;
-}
+       //      free the right page
+
+       bt_lockpage (BtLockDelete, right->latch);
+       bt_lockpage (BtLockWrite, right->latch);
+       bt_freepage (bt, right);
+
+       //      save parent page fence value
 
-//  remove empty page from the B-tree
-//     try merging left first.  If no left
-//     sibling, then merge right.
+       memcpy (pagefence, parent->page->fence, 256);
+       bt_unlockpage (BtLockWrite, parent->latch);
 
-//     call with page loaded and locked,
-//  return with page locked.
+       return bt_removepage (bt, parent, lvl, pagefence);
+}
+
+//     remove page from btree
+//     call with page unlocked
+//     returns with page on free list
 
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence)
 {
-unsigned char fencekey[256], postkey[256];
-uint slot, idx, postfence = 0;
-BtLatchSet *lset, *pset;
-BtPool *lpool, *ppool;
-BtPage lpage, ppage;
-uid left, parent;
+BtPageSet parent[1], sibling[1], rparent[1];
+unsigned char newfence[256];
+uint slot, idx;
 BtKey ptr;
 
-       ptr = keyptr(page, page->cnt);
-       memcpy(fencekey, ptr, ptr->len + 1);
-       bt_unlockpage (BtLockWrite, set);
-
        //      load and lock our parent
 
 retry:
-       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
+       if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
-       parent = bt->page_no;
-       ppage = bt->page;
-       ppool = bt->pool;
-       pset = bt->set;
+       //      can we do a simple merge entirely
+       //      between siblings on the parent page?
 
-       //      wait until we are posted in our parent
+       if( slot < parent->page->cnt ) {
+               // find our right neighbor
+               //      right must exist because the stopper prevents
+               //      the rightmost page from deleting
 
-       if( !bt->parent ) {
-               bt_unlockpage (BtLockWrite, pset);
-               bt_unpinlatch (pset);
-               bt_unpinpool (ppool);
-#ifdef unix
+               for( idx = slot; idx++ < parent->page->cnt; )
+                 if( !slotptr(parent->page, idx)->dead )
+                       break;
+
+               sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
+
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+
+               //      merge right if sibling shows up in
+               //  our parent and is not being killed
+
+               if( sibling->page_no == bt_getid (set->page->right) ) {
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
+                 bt_lockpage (BtLockParent, sibling->latch);
+                 bt_lockpage (BtLockDelete, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_mergeright(bt, set, parent, sibling, slot, idx);
+
+                 //  try again later
+
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockParent, sibling->latch);
+                 bt_unlockpage (BtLockDelete, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
+
+               bt_unlockpage (BtLockDelete, set->latch);
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
                sched_yield();
 #else
                SwitchToThread();
@@ -1700,35 +1824,59 @@ retry:
        //  find our left neighbor in our parent page
 
        for( idx = slot; --idx; )
-         if( !slotptr(ppage, idx)->dead )
+         if( !slotptr(parent->page, idx)->dead )
                break;
 
-       //      if no left neighbor, do right merge
+       //      if no left neighbor, delete ourselves and our parent
 
-       if( !idx )
-               return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
+       if( !idx ) {
+               bt_lockpage (BtLockAccess, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockAccess, set->latch);
 
-       // lock and map our left neighbor's page
+               rparent->page_no = bt_getid (parent->page->right);
+               rparent->latch = bt_pinlatch (bt, rparent->page_no);
 
-       left = bt_getid (slotptr(ppage, idx)->id);
+               bt_lockpage (BtLockAccess, rparent->latch);
+               bt_lockpage (BtLockWrite, rparent->latch);
+               bt_unlockpage (BtLockAccess, rparent->latch);
 
-       if( lpool = bt_pinpool (bt, left) )
-               lpage = bt_page (bt, lpool, left);
-       else
-               return bt->err;
+               if( rparent->pool = bt_pinpool (bt, rparent->page_no) )
+                       rparent->page = bt_page (bt, rparent->pool, rparent->page_no);
+               else
+                       return bt->err;
+
+               if( !rparent->page->kill ) {
+                 sibling->page_no = bt_getid (set->page->right);
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
 
-       lset = bt_pinlatch (bt, left);
-       bt_lockpage(BtLockWrite, lset);
+                 bt_lockpage (BtLockAccess, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockAccess, sibling->latch);
 
-       //  wait until sibling is in our parent
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
+
+                 //  try again later
 
-       if( bt_getid (lpage->right) != page_no ) {
-               bt_unlockpage (BtLockWrite, pset);
-               bt_unpinlatch (pset);
-               bt_unpinpool (ppool);
-               bt_unlockpage (BtLockWrite, lset);
-               bt_unpinlatch (lset);
-               bt_unpinpool (lpool);
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
+
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, rparent->latch);
+               bt_unpinlatch (rparent->latch);
+               bt_unpinpool (rparent->pool);
+
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
 #ifdef linux
                sched_yield();
 #else
@@ -1737,114 +1885,97 @@ retry:
                goto retry;
        }
 
-       //  since our page will have no more pointers to it,
-       //      obtain Delete lock and wait for write locks to clear
-
-       bt_lockpage(BtLockDelete, set);
-       bt_lockpage(BtLockWrite, set);
-
-       //      if we aren't dead yet,
-       //      get ready for exit
-
-       if( page->act ) {
-               bt_unlockpage(BtLockDelete, set);
-               bt_unlockpage(BtLockWrite, lset);
-               bt_unpinlatch (lset);
-               bt_unpinpool (lpool);
-               goto lmergexit;
-       }
-
-       //      are we are the fence key for our parent?
-       //      if so, grab our old fence key
+       // redirect parent to our left sibling
+       // lock and map our left sibling's page
 
-       if( postfence = slot == ppage->cnt ) {
-               ptr = keyptr (ppage, ppage->cnt);
-               memcpy(fencekey, ptr, ptr->len + 1);
-               memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
-
-               // clear out other dead slots
-
-               while( --ppage->cnt )
-                 if( slotptr(ppage, ppage->cnt)->dead )
-                       memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
-                 else
-                       break;
+       sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
+       sibling->latch = bt_pinlatch (bt, sibling->page_no);
 
-               ptr = keyptr (ppage, ppage->cnt);
-               memcpy(postkey, ptr, ptr->len + 1);
-       } else
-               slotptr(ppage,slot)->dead = 1;
+       //      wait our turn on fence key maintenance
 
-       ppage->dirty = 1;
-       ppage->act--;
+       bt_lockpage(BtLockParent, sibling->latch);
+       bt_lockpage(BtLockAccess, sibling->latch);
+       bt_lockpage(BtLockWrite, sibling->latch);
+       bt_unlockpage(BtLockAccess, sibling->latch);
 
-       //      push our right neighbor pointer to our left
-
-       memcpy (lpage->right, page->right, BtId);
+       if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+       else
+               return bt->err;
 
-       //      add ourselves to free chain
-       //      lock latch mgr
+       //  wait until left sibling is in our parent
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock);
+       if( bt_getid (sibling->page->right) != set->page_no ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unlockpage (BtLockWrite, sibling->latch);
+               bt_unlockpage (BtLockParent, sibling->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+               bt_unpinlatch (sibling->latch);
+               bt_unpinpool (sibling->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
+       }
 
-       //      store free chain in allocation page second right
-       bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
+       //      delete our left sibling from parent
 
-       // unlock latch mgr and pages
+       slotptr(parent->page,idx)->dead = 1;
+       parent->page->dirty = 1;
+       parent->page->act--;
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
-       bt_unlockpage(BtLockWrite, lset);
-       bt_unpinlatch (lset);
-       bt_unpinpool (lpool);
+       //      redirect our parent slot to our left sibling
 
-       // release our node's delete lock
+       bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
+       memcpy (sibling->page->right, set->page->right, BtId);
 
-       bt_unlockpage(BtLockDelete, set);
+       //      collapse dead slots from parent
 
-lmergexit:
-       bt_unlockpage (BtLockWrite, pset);
-       bt_unpinpool (ppool);
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
+                 break;
 
-       //  do we need to post parent's fence key in its parent?
+       //  free our original page
 
-       if( !postfence || parent == ROOT_page ) {
-               bt_unpinlatch (pset);
-               bt->found = 1;
-               return bt->err = 0;
-       }
+       bt_lockpage (BtLockDelete, set->latch);
+       bt_lockpage (BtLockWrite, set->latch);
+       bt_freepage (bt, set);
 
-       //      interlock parent fence post
+       //      go down the left node's fence keys to the leaf level
+       //      and update the fence keys in each page
 
-       bt_lockpage (BtLockParent, pset);
+       memcpy (newfence, parent->page->fence, 256);
 
-       //      load parent's parent page
-posttry:
-       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+       if( bt_fixfences (bt, sibling, newfence) )
                return bt->err;
 
-       if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
-         if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
-               return bt->err;
-         else
-               goto posttry;
+       //  promote sibling as new root?
 
-       page = bt->page;
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+        if( sibling->page->lvl ) {
+         sibling->latch = bt_pinlatch (bt, sibling->page_no);
+         bt_lockpage (BtLockDelete, sibling->latch);
+         bt_lockpage (BtLockWrite, sibling->latch);
 
-       page->min -= *postkey + 1;
-       ((unsigned char *)page)[page->min] = *postkey;
-       memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
-       slotptr(page, slot)->off = page->min;
+         if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+         else
+               return bt->err;
 
-       bt_unlockpage (BtLockParent, pset);
-       bt_unpinlatch (pset);
+         return bt_removeroot (bt, parent, sibling);
+        }
 
-       bt_unlockpage (BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
 
-       bt->found = 1;
-       return bt->err = 0;
+       return 0;
 }
 
 //  find and delete key on page by marking delete flag bit
@@ -1852,65 +1983,79 @@ posttry:
 
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
 {
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
-uid page_no;
+unsigned char pagefence[256];
+uint slot, idx, found;
+BtPageSet set[1];
 BtKey ptr;
-uint slot;
 
-       if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
+               ptr = keyptr(set->page, slot);
+       else
                return bt->err;
 
-       page_no = bt->page_no;
-       page = bt->page;
-       pool = bt->pool;
-       set = bt->set;
-
        // if key is found delete it, otherwise ignore request
 
-       ptr = keyptr(page, slot);
+       if( found = slot <= set->page->cnt )
+         if( found = !keycmp (ptr, key, len) )
+               if( found = slotptr(set->page, slot)->dead == 0 ) {
+                 slotptr(set->page,slot)->dead = 1;
+                 set->page->dirty = 1;
+                 set->page->act--;
+
+                 // collapse empty slots
 
-       if( bt->found = !keycmp (ptr, key, len) )
-         if( bt->found = slotptr(page, slot)->dead == 0 ) {
-               slotptr(page,slot)->dead = 1;
-                 if( slot < page->cnt )
-                       page->dirty = 1;
-                 if( !--page->act )
-                       if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
-                         return bt->err;
+                 while( idx = set->page->cnt - 1 )
+                       if( slotptr(set->page, idx)->dead ) {
+                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
+                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
+                       } else
+                               break;
                }
 
-       bt_unlockpage(BtLockWrite, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
-       return bt->err = 0;
+       if( set->page->act ) {
+               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               return bt->found = found, 0;
+       }
+
+       memcpy (pagefence, set->page->fence, 256);
+       set->page->kill = 1;
+
+       bt_unlockpage (BtLockWrite, set->latch);
+
+       if( bt_removepage (bt, set, 0, pagefence) )
+               return bt->err;
+
+       bt->found = found;
+       return 0;
 }
 
 //     find key in leaf level and return row-id
 
 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint  slot;
 BtKey ptr;
 uid id;
 
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               ptr = keyptr(bt->page, slot);
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+               ptr = keyptr(set->page, slot);
        else
                return 0;
 
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
-               id = bt_getid(slotptr(bt->page,slot)->id);
+       if( !keycmp (ptr, key, len) )
+               id = bt_getid(slotptr(set->page,slot)->id);
        else
                id = 0;
 
-       bt_unlockpage (BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage (BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return id;
 }
 
@@ -1921,7 +2066,7 @@ uid id;
 
 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
-uint nxt = bt->mgr->page_size;
+uint nxt = bt->mgr->page_size, off;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
 uint newslot = max;
@@ -1944,28 +2089,31 @@ BtKey key;
        page->act = 0;
 
        // try cleaning up page first
-
-       // always leave fence key in the array
-       // otherwise, remove deleted key
+       // by removing deleted keys
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
-               // copy key
+               // if its not the fence key,
+               // copy the key across
 
-               key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               off = slotptr(bt->frame,cnt)->off;
+
+               if( off >= sizeof(*page) ) {
+                       key = keyptr(bt->frame, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               }
 
                // copy slot
+
                memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
-               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
-                       page->act++;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
+               slotptr(page, idx)->off = off;
+               page->act++;
        }
 
        page->min = nxt;
@@ -1979,202 +2127,192 @@ BtKey key;
        return 0;
 }
 
-//     add key to current page
-//     page must already be writelocked
-
-void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
-{
-uint idx;
-
-       // find next available dead slot and copy key onto page
-
-       for( idx = slot; idx < page->cnt; idx++ )
-         if( slotptr(page, idx)->dead )
-               break;
-
-       if( idx == page->cnt )
-               idx++, page->cnt++;
-
-       page->act++;
+// split the root and raise the height of the btree
 
-       // now insert key into array before slot
-
-       while( idx > slot )
-               *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
-
-       page->min -= len + 1;
-       ((unsigned char *)page)[page->min] = len;
-       memcpy ((unsigned char *)page + page->min +1, key, len );
-
-       bt_putid(slotptr(page,slot)->id, id);
-       slotptr(page, slot)->off = page->min;
-       slotptr(page, slot)->tod = tod;
-       slotptr(page, slot)->dead = 0;
-}
-
-BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
+BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
 {
 uint nxt = bt->mgr->page_size;
-BtPage root = bt->page;
+unsigned char leftkey[256];
 uid new_page;
 
        //  Obtain an empty page to use, and copy the current
-       //  root contents into it
+       //  root contents into it, e.g. lower keys
+
+       memcpy (leftkey, root->page->fence, 256);
+       root->page->posted = 1;
 
-       if( !(new_page = bt_newpage(bt, root)) )
+       if( !(new_page = bt_newpage(bt, root->page)) )
                return bt->err;
 
        // preserve the page info at the bottom
-       // and set rest to zero
+       // of higher keys and set rest to zero
 
-       memset(root+1, 0, bt->mgr->page_size - sizeof(*root));
+       memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
+       memset(root->page->fence, 0, 256);
+       root->page->fence[0] = 2;
+       root->page->fence[1] = 0xff;
+       root->page->fence[2] = 0xff;
 
-       // insert first key on newroot page
+       // insert lower keys page fence key on newroot page
 
        nxt -= *leftkey + 1;
-       memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
-       bt_putid(slotptr(root, 1)->id, new_page);
-       slotptr(root, 1)->off = nxt;
+       memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
+       bt_putid(slotptr(root->page, 1)->id, new_page);
+       slotptr(root->page, 1)->off = nxt;
        
-       // insert second key (stopper key) on newroot page
+       // insert stopper key on newroot page
        // and increase the root height
 
-       nxt -= 3;
-       *((unsigned char *)root + nxt) = 2;
-       memset ((unsigned char *)root + nxt + 1, 0xff, 2);
-       bt_putid(slotptr(root, 2)->id, page_no2);
-       slotptr(root, 2)->off = nxt;
+       bt_putid(slotptr(root->page, 2)->id, page_no2);
+       slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
 
-       bt_putid(root->right, 0);
-       root->min = nxt;                // reset lowest used offset and key count
-       root->cnt = 2;
-       root->act = 2;
-       root->lvl++;
+       bt_putid(root->page->right, 0);
+       root->page->min = nxt;          // reset lowest used offset and key count
+       root->page->cnt = 2;
+       root->page->act = 2;
+       root->page->lvl++;
 
-       // release and unpin root (bt->page)
+       // release and unpin root
 
-       bt_unlockpage(BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage(BtLockWrite, root->latch);
+       bt_unpinlatch (root->latch);
+       bt_unpinpool (root->pool);
        return 0;
 }
 
 //  split already locked full node
-//     return unlocked and unpinned.
+//     return unlocked.
 
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
+BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
-uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
-unsigned char rightkey[256], leftkey[256];
-uint tod = time(NULL);
-uint lvl = page->lvl;
-uid new_page;
+uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size, off;
+unsigned char fencekey[256];
+uint lvl = set->page->lvl;
+uid right;
 BtKey key;
 
-       //      initialize frame buffer for right node
+       //  split higher half of keys to bt->frame
 
        memset (bt->frame, 0, bt->mgr->page_size);
-       max = page->cnt;
+       max = set->page->cnt;
        cnt = max / 2;
        idx = 0;
 
-       //  split higher half of keys to bt->frame
-
        while( cnt++ < max ) {
-               key = keyptr(page, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
-               memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
-               if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
-                       bt->frame->act++;
-               slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
-               slotptr(bt->frame, idx)->off = nxt;
+               if( !lvl || cnt < max ) {
+                       key = keyptr(set->page, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
+               } else
+                       off = offsetof(struct BtPage_, fence);
+
+               memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
+               slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
+               slotptr(bt->frame, idx)->off = off;
+               bt->frame->act++;
        }
 
-       // transfer right link node to new right node
-
-       if( page_no > ROOT_page )
-               memcpy (bt->frame->right, page->right, BtId);
+       if( set->page_no == ROOT_page )
+               bt->frame->posted = 1;
 
+       memcpy (bt->frame->fence, set->page->fence, 256);
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
        bt->frame->cnt = idx;
        bt->frame->lvl = lvl;
 
-       //      get new free page and write right frame to it.
+       // link right node
 
-       if( !(new_page = bt_newpage(bt, bt->frame)) )
-               return bt->err;
+       if( set->page_no > ROOT_page )
+               memcpy (bt->frame->right, set->page->right, BtId);
 
-       //      remember fence key for new right page to add
-       //      as right sibling to the left node
+       //      get new free page and write higher keys to it.
 
-       key = keyptr(bt->frame, idx);
-       memcpy (rightkey, key, key->len + 1);
+       if( !(right = bt_newpage(bt, bt->frame)) )
+               return bt->err;
 
        //      update lower keys to continue in old page
 
-       memcpy (bt->frame, page, bt->mgr->page_size);
-       memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
+       memcpy (bt->frame, set->page, bt->mgr->page_size);
+       memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
        nxt = bt->mgr->page_size;
-       page->dirty = 0;
-       page->act = 0;
+       set->page->posted = 0;
+       set->page->dirty = 0;
+       set->page->act = 0;
        cnt = 0;
        idx = 0;
 
        //  assemble page of smaller keys
-       //      to remain in the old page
 
        while( cnt++ < max / 2 ) {
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
-               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
-                       page->act++;
-               slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
-       }
 
-       // finalize left page and save fence key
+               if( !lvl || cnt < max / 2 ) {
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
+               } else 
+                       off = offsetof(struct BtPage_, fence);
 
-       memcpy(leftkey, key, key->len + 1);
-       page->min = nxt;
-       page->cnt = idx;
+               memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
+               slotptr(set->page, idx)->off = off;
+               set->page->act++;
+       }
+
+       // install fence key for smaller key page
 
-       //      link new right page
+       memset(set->page->fence, 0, 256);
+       memcpy(set->page->fence, key, key->len + 1);
 
-       bt_putid (page->right, new_page);
+       bt_putid(set->page->right, right);
+       set->page->min = nxt;
+       set->page->cnt = idx;
 
        // if current page is the root page, split it
 
-       if( page_no == ROOT_page )
-               return bt_splitroot (bt, leftkey, new_page);
+       if( set->page_no == ROOT_page )
+               return bt_splitroot (bt, set, right);
 
-       //  obtain ParentModification lock for current page
+       bt_unlockpage (BtLockWrite, set->latch);
 
-       bt_lockpage (BtLockParent, set);
+       // insert new fences in their parent pages
 
-       //  release wr lock on our page.
-       //      this will keep out another SMO
+       while( 1 ) {
+               bt_lockpage (BtLockParent, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
 
-       bt_unlockpage (BtLockWrite, set);
+               memcpy (fencekey, set->page->fence, 256);
+               right = bt_getid (set->page->right);
 
-       //  insert key for old page (lower keys)
+               if( set->page->posted ) {
+                       bt_unlockpage (BtLockParent, set->latch);
+                       bt_unlockpage (BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
+               }
 
-       if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) )
-               return bt->err;
+               set->page->posted = 1;
+               bt_unlockpage (BtLockWrite, set->latch);
 
-       //      switch old parent key from us to our right page
+               if( bt_insertkey (bt, fencekey+1, *fencekey, set->page_no, time(NULL), lvl+1) )
+                       return bt->err;
 
-       if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) )
-               return bt->err;
+               bt_unlockpage (BtLockParent, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
 
-       //      unlock and unpin
+               if( !(set->page_no = right) )
+                       break;
+
+               set->latch = bt_pinlatch (bt, right);
+
+               if( set->pool = bt_pinpool (bt, right) )
+                       set->page = bt_page (bt, set->pool, right);
+               else
+                       return bt->err;
+       }
 
-       bt_unlockpage (BtLockParent, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
        return 0;
 }
 
@@ -2182,13 +2320,13 @@ BtKey key;
 
 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
 {
+BtPageSet set[1];
 uint slot, idx;
-BtPage page;
 BtKey ptr;
 
        while( 1 ) {
-               if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-                       ptr = keyptr(bt->page, slot);
+               if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
+                       ptr = keyptr(set->page, slot);
                else
                {
                        if ( !bt->err )
@@ -2198,34 +2336,56 @@ BtKey ptr;
 
                // if key already exists, update id and return
 
-               page = bt->page;
-
-               if( !keycmp (ptr, key, len) ) {
-                       if( slotptr(page, slot)->dead )
-                               page->act++;
-                       slotptr(page, slot)->dead = 0;
-                       slotptr(page, slot)->tod = tod;
-                       bt_putid(slotptr(page,slot)->id, id);
-                       bt_unlockpage(BtLockWrite, bt->set);
-                       bt_unpinlatch (bt->set);
-                       bt_unpinpool (bt->pool);
-                       return bt->err;
+               if( slot <= set->page->cnt )
+                 if( !keycmp (ptr, key, len) ) {
+                       if( slotptr(set->page, slot)->dead )
+                               set->page->act++;
+                       slotptr(set->page, slot)->dead = 0;
+                       slotptr(set->page, slot)->tod = tod;
+                       bt_putid(slotptr(set->page,slot)->id, id);
+                       bt_unlockpage(BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
                }
 
                // check if page has enough space
 
-               if( slot = bt_cleanpage (bt, bt->page, len, slot) )
+               if( slot = bt_cleanpage (bt, set->page, len, slot) )
                        break;
 
-               if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+               if( bt_splitpage (bt, set) )
                        return bt->err;
        }
 
-       bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
+       // calculate next available slot and copy key into page
+
+       set->page->min -= len + 1; // reset lowest used offset
+       ((unsigned char *)set->page)[set->page->min] = len;
+       memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
+
+       for( idx = slot; idx <= set->page->cnt; idx++ )
+         if( slotptr(set->page, idx)->dead )
+               break;
+
+       // now insert key into array before slot
+
+       if( idx > set->page->cnt )
+               set->page->cnt++;
+
+       set->page->act++;
 
-       bt_unlockpage (BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       while( idx > slot )
+               *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+
+       bt_putid(slotptr(set->page,slot)->id, id);
+       slotptr(set->page, slot)->off = set->page->min;
+       slotptr(set->page, slot)->tod = tod;
+       slotptr(set->page, slot)->dead = 0;
+
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return 0;
 }
 
@@ -2233,17 +2393,21 @@ BtKey ptr;
 
 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint slot;
 
        // cache page for retrieval
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               memcpy (bt->cursor, bt->page, bt->mgr->page_size);
 
-       bt->cursor_page = bt->page_no;
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+         memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       else
+         return 0;
+
+       bt->cursor_page = set->page_no;
 
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return slot;
 }
 
@@ -2252,9 +2416,7 @@ uint slot;
 
 uint bt_nextkey (BtDb *bt, uint slot)
 {
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
+BtPageSet set[1];
 uid right;
 
   do {
@@ -2262,7 +2424,7 @@ uid right;
        while( slot++ < bt->cursor->cnt )
          if( slotptr(bt->cursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt) )
+         else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
                return slot;
          else
                break;
@@ -2271,19 +2433,20 @@ uid right;
                break;
 
        bt->cursor_page = right;
-       if( pool = bt_pinpool (bt, right) )
-               page = bt_page (bt, pool, right);
+
+       if( set->pool = bt_pinpool (bt, right) )
+               set->page = bt_page (bt, set->pool, right);
        else
                return 0;
 
-       set = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, set);
+       set->latch = bt_pinlatch (bt, right);
+    bt_lockpage(BtLockRead, set->latch);
 
-       memcpy (bt->cursor, page, bt->mgr->page_size);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        slot = 0;
   } while( 1 );
 
@@ -2311,45 +2474,56 @@ uint bt_tod(BtDb *bt, uint slot)
 void bt_latchaudit (BtDb *bt)
 {
 ushort idx, hashidx;
-BtLatchSet *set;
+BtLatchSet *latch;
 BtPool *pool;
 BtPage page;
 uid page_no;
 
 #ifdef unix
        for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
-               set = bt->mgr->latchsets + idx;
-               if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
-                       fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
-                       *(ushort *)set->readwr = 0;
-                       *(ushort *)set->access = 0;
-                       *(ushort *)set->parent = 0;
+               latch = bt->mgr->latchsets + idx;
+               if( *(ushort *)latch->readwr ) {
+                       fprintf(stderr, "latchset %d r/w locked for page %.8x\n", idx, latch->page_no);
+                       *(ushort *)latch->readwr = 0;
+               }
+               if( *(ushort *)latch->access ) {
+                       fprintf(stderr, "latchset %d access locked for page %.8x\n", idx, latch->page_no);
+                       *(ushort *)latch->access = 0;
+               }
+               if( *(ushort *)latch->parent ) {
+                       fprintf(stderr, "latchset %d parent locked for page %.8x\n", idx, latch->page_no);
+                       *(ushort *)latch->parent = 0;
                }
-               if( set->pin ) {
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-                       set->pin = 0;
+               if( *(ushort *)latch->busy ) {
+                       fprintf(stderr, "latchset %d busy locked for page %.8x\n", idx, latch->page_no);
+                       *(ushort *)latch->parent = 0;
+               }
+               if( latch->pin ) {
+                       fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
+                       latch->pin = 0;
                }
        }
 
        for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
-         if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
-               fprintf(stderr, "latchmgr locked\n");
          if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
-               set = bt->mgr->latchsets + idx;
-               if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
-                       fprintf(stderr, "latchset %d locked\n", idx);
-               if( set->hash != hashidx )
+               latch = bt->mgr->latchsets + idx;
+               if( latch->hash != hashidx ) {
                        fprintf(stderr, "latchset %d wrong hashidx\n", idx);
-               if( set->pin )
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-         } while( idx = set->next );
+                       latch->hash = hashidx;
+               }
+         } while( idx = latch->next );
        }
+
        page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
 
        while( page_no ) {
                fprintf(stderr, "free: %.6x\n", (uint)page_no);
-               pool = bt_pinpool (bt, page_no);
-               page = bt_page (bt, pool, page_no);
+
+               if( pool = bt_pinpool (bt, page_no) )
+                       page = bt_page (bt, pool, page_no);
+               else
+                       return;
+
            page_no = bt_getid(page->right);
                bt_unpinpool (pool);
        }
@@ -2377,10 +2551,8 @@ uid next, page_no = LEAF_page;   // start on first page of leaves
 unsigned char key[256];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
-BtLatchSet *set;
+BtPageSet set[1];
 time_t tod[1];
-BtPool *pool;
-BtPage page;
 BtKey ptr;
 BtDb *bt;
 FILE *in;
@@ -2466,38 +2638,49 @@ FILE *in;
                break;
 
        case 's':
-               len = key[0] = 0;
-
-               fprintf(stderr, "started reading\n");
-
-               if( slot = bt_startkey (bt, key, len) )
-                 slot--;
-               else
-                 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
-
-               while( slot = bt_nextkey (bt, slot) ) {
-                       ptr = bt_key(bt, slot);
-                       fwrite (ptr->key, ptr->len, 1, stdout);
-                       fputc ('\n', stdout);
-               }
+               fprintf(stderr, "started scanning\n");
+               do {
+                       if( set->pool = bt_pinpool (bt, page_no) )
+                               set->page = bt_page (bt, set->pool, page_no);
+                       else
+                               break;
+                       set->latch = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set->latch);
+                       next = bt_getid (set->page->right);
+                       cnt += set->page->act;
+
+                       for( slot = 0; slot++ < set->page->cnt; )
+                        if( next || slot < set->page->cnt )
+                         if( !slotptr(set->page, slot)->dead ) {
+                               ptr = keyptr(set->page, slot);
+                               fwrite (ptr->key, ptr->len, 1, stdout);
+                               fputc ('\n', stdout);
+                         }
+
+                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+               } while( page_no = next );
 
+               cnt--;  // remove stopper key
+               fprintf(stderr, " Total keys read %d\n", cnt);
                break;
 
        case 'c':
-               fprintf(stderr, "started reading\n");
+               fprintf(stderr, "started counting\n");
 
                do {
-                       if( pool = bt_pinpool (bt, page_no) )
-                               page = bt_page (bt, pool, page_no);
+                       if( set->pool = bt_pinpool (bt, page_no) )
+                               set->page = bt_page (bt, set->pool, page_no);
                        else
                                break;
-                       set = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, set);
-                       cnt += page->act;
-                       next = bt_getid (page->right);
-                       bt_unlockpage (BtLockRead, set);
-                       bt_unpinlatch (set);
-                       bt_unpinpool (pool);
+                       set->latch = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set->latch);
+                       cnt += set->page->act;
+                       next = bt_getid (set->page->right);
+                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
                } while( page_no = next );
 
                cnt--;  // remove stopper key
index e9aa1e7444b8d3d4dcdb28decbe3c19af162a01e..70052ee699622a94004df5f291af933b3b6cc4de 100644 (file)
@@ -1,5 +1,6 @@
 // btree version threads2j linux futex concurrency version
-// 30 JAN 2014
+//     with reworked bt_deletekey
+// 12 FEB 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -52,6 +53,7 @@ REDISTRIBUTION OF THIS SOFTWARE.
 
 #include <memory.h>
 #include <string.h>
+#include <stddef.h>
 
 typedef unsigned long long     uid;
 
@@ -149,14 +151,19 @@ typedef struct {
 //     It is immediately followed
 //     by the BtSlot array of keys.
 
-typedef struct Page {
+typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
-       unsigned char bits;                     // page size in bits
-       unsigned char lvl:7;            // level of page
+       unsigned char bits:7;           // page size in bits
+       unsigned char free:1;           // page is on free list
+       unsigned char lvl:4;            // level of page
+       unsigned char kill:1;           // page is being deleted
        unsigned char dirty:1;          // page has deleted keys
+       unsigned char posted:1;         // page fence has posted
+       unsigned char goright:1;        // page is being killed, continue to right
        unsigned char right[BtId];      // page number to right
+       unsigned char fence[256];       // page fence key
 } *BtPage;
 
 //  hash table entries
@@ -195,10 +202,19 @@ typedef struct {
 #endif
 } BtPool;
 
+//  The loadpage interface object
+
+typedef struct {
+       uid page_no;            // current page number
+       BtPage page;            // current page pointer
+       BtPool *pool;           // current page pool
+       BtLatchSet *latch;      // current page latch set
+} BtPageSet;
+
 //     structure for latch manager on ALLOC_page
 
 typedef struct {
-       struct Page alloc[2];           // next & free page_nos in right ptr
+       struct BtPage_ alloc[2];        // next & free page_nos in right ptr
        BtLatch lock[1];                        // allocation area lite latch
        ushort latchdeployed;           // highest number of latch entries deployed
        ushort nlatchpage;                      // number of latch pages at BT_latch
@@ -240,13 +256,8 @@ typedef struct {
        BtPage cursor;          // cached frame for start/next (never mapped)
        BtPage frame;           // spare frame for the page split (never mapped)
        BtPage zero;            // page of zeroes to extend the file (never mapped)
-       BtPage page;            // current page mapped from file
-       uid page_no;            // current page number  
        uid cursor_page;        // current cursor page number   
-       BtLatchSet *set;        // current page latchset
-       BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
-       int parent;                     // last loadpage was from a parent level
        int found;                      // last delete or insert was found
        int err;                        // last error
 } BtDb;
@@ -271,9 +282,7 @@ extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
 //     internal functions
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
-uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence);
 
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
@@ -319,8 +328,8 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //     one with two keys.
 
 //     Deleted keys are marked with a dead bit until
-//     page cleanup The fence key for a node is always
-//     present, even after deletion and cleanup.
+//     page cleanup. The fence key for a node is
+//     present in a special array.
 
 //  Groups of pages called segments from the btree are optionally
 //  cached with a memory mapped pool. A hash table is used to keep
@@ -335,12 +344,13 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //  Page 0 is dedicated to lock for new page extensions,
 //     and chains empty pages together for reuse.
 
-//     The ParentModification lock on a node is obtained to prevent resplitting
-//     or deleting a node before its fence is posted into its upper level.
+//     The ParentModification lock on a node is obtained to serialize posting
+//     or changing the fence key for a node.
 
 //     Empty pages are chained together through the ALLOC page and reused.
 
 //     Access macros to address slot and key values from the page
+//     Page slots use 1 based indexing.
 
 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
@@ -907,13 +917,12 @@ SYSTEM_INFO sysinfo[1];
        latchmgr->alloc->bits = mgr->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
-               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
+               slotptr(latchmgr->alloc, 1)->off = offsetof(struct BtPage_, fence);
                bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
-               key = keyptr(latchmgr->alloc, 1);
-               key->len = 2;                   // create stopper key
-               key->key[0] = 0xff;
-               key->key[1] = 0xff;
-               latchmgr->alloc->min = mgr->page_size - 3;
+               latchmgr->alloc->fence[0] = 2;                  // create stopper key
+               latchmgr->alloc->fence[1] = 0xff;
+               latchmgr->alloc->fence[2] = 0xff;
+               latchmgr->alloc->min = mgr->page_size;
                latchmgr->alloc->lvl = lvl;
                latchmgr->alloc->cnt = 1;
                latchmgr->alloc->act = 1;
@@ -1106,7 +1115,7 @@ int flag;
 
 #ifdef unix
        flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
-       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
+       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED | MAP_POPULATE, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
 
@@ -1339,10 +1348,8 @@ void bt_unlockpage(BtLock mode, BtLatchSet *set)
 
 uid bt_newpage(BtDb *bt, BtPage page)
 {
-BtLatchSet *set;
-BtPool *pool;
+BtPageSet set[1];
 uid new_page;
-BtPage pmap;
 int reuse;
 
        //      lock allocation page
@@ -1353,12 +1360,13 @@ int reuse;
        // else allocate empty page
 
        if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
-               if( pool = bt_pinpool (bt, new_page) )
-                       pmap = bt_page (bt, pool, new_page);
+               if( set->pool = bt_pinpool (bt, new_page) )
+                       set->page = bt_page (bt, set->pool, new_page);
                else
                        return 0;
-               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
-               bt_unpinpool (pool);
+
+               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
+               bt_unpinpool (set->pool);
                reuse = 1;
        } else {
                new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
@@ -1374,7 +1382,6 @@ int reuse;
        if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
        {
                // use zero buffer to write zeros
-               memset(bt->zero, 0, bt->mgr->page_size);
                if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
                        return bt->err = BTERR_wrt, 0;
        }
@@ -1382,13 +1389,13 @@ int reuse;
        //      bring new page into pool and copy page.
        //      this will extend the file into the new pages.
 
-       if( pool = bt_pinpool (bt, new_page) )
-               pmap = bt_page (bt, pool, new_page);
+       if( set->pool = bt_pinpool (bt, new_page) )
+               set->page = bt_page (bt, set->pool, new_page);
        else
                return 0;
 
-       memcpy(pmap, page, bt->mgr->page_size);
-       bt_unpinpool (pool);
+       memcpy(set->page, page, bt->mgr->page_size);
+       bt_unpinpool (set->pool);
 #endif
        // unlock allocation latch and return new page no
 
@@ -1398,142 +1405,161 @@ int reuse;
 
 //  find slot in page for given key at a given level
 
-int bt_findslot (BtDb *bt, unsigned char *key, uint len)
+int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
 {
-uint diff, higher = bt->page->cnt, low = 1, slot;
-uint good = 0;
+uint diff, higher = set->page->cnt, low = 1, slot;
 
-       //      if no right link
        //        make stopper key an infinite fence value
-       //        by setting the good flag
 
-       if( bt_getid (bt->page->right) )
+       if( bt_getid (set->page->right) )
                higher++;
-       else
-               good++;
 
-       //      low is the next candidate.
+       //      low is the lowest candidate.
        //  loop ends when they meet
 
-       //  if good, higher is already
+       //  higher is already
        //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
-               if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
+               if( keycmp (keyptr(set->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot, good++;
+                       higher = slot;
        }
 
+       if( higher <= set->page->cnt )
+               return higher;
+
+       //      if leaf page, compare against fence value
+
        //      return zero if key is on right link page
+       //      or return slot beyond last key
+
+       if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
+               return 0;
 
-       return good ? higher : 0;
+       return higher;
 }
 
 //  find and load page at given level for given key
 //     leave page rd or wr locked as requested
 
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
+int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
 {
 uid page_no = ROOT_page, prevpage = 0;
-BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
+BtLatchSet *prevlatch;
 uint mode, prevmode;
 BtPool *prevpool;
-int parent = 1;
 
   //  start at root of btree and drill down
 
-  bt->set = NULL;
-
   do {
        // determine lock mode of drill level
        mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
 
-       bt->set = bt_pinlatch (bt, page_no);
-       bt->page_no = page_no;
+       set->latch = bt_pinlatch (bt, page_no);
+       set->page_no = page_no;
 
        // pin page contents
 
-       if( bt->pool = bt_pinpool (bt, page_no) )
-               bt->page = bt_page (bt, bt->pool, page_no);
+       if( set->pool = bt_pinpool (bt, page_no) )
+               set->page = bt_page (bt, set->pool, page_no);
        else
                return 0;
 
        // obtain access lock using lock chaining with Access mode
 
        if( page_no > ROOT_page )
-         bt_lockpage(BtLockAccess, bt->set);
+         bt_lockpage(BtLockAccess, set->latch);
 
        //      release & unpin parent page
 
        if( prevpage ) {
-         bt_unlockpage(prevmode, prevset);
-         bt_unpinlatch (prevset);
+         bt_unlockpage(prevmode, prevlatch);
+         bt_unpinlatch (prevlatch);
          bt_unpinpool (prevpool);
          prevpage = 0;
        }
 
        // obtain read lock using lock chaining
 
-       bt_lockpage(mode, bt->set);
+       bt_lockpage(mode, set->latch);
 
        if( page_no > ROOT_page )
-         bt_unlockpage(BtLockAccess, bt->set);
+         bt_unlockpage(BtLockAccess, set->latch);
 
        // re-read and re-lock root after determining actual level of root
 
-       if( bt->page->lvl != drill) {
-               if ( bt->page_no != ROOT_page )
+       if( set->page->lvl != drill) {
+               if ( set->page_no != ROOT_page )
                        return bt->err = BTERR_struct, 0;
                        
-               drill = bt->page->lvl;
+               drill = set->page->lvl;
 
                if( lock == BtLockWrite && drill == lvl ) {
-                 bt_unlockpage(mode, bt->set);
-                 bt_unpinlatch (bt->set);
-                 bt_unpinpool (bt->pool);
+                 bt_unlockpage(mode, set->latch);
+                 bt_unpinlatch (set->latch);
+                 bt_unpinpool (set->pool);
                  continue;
                }
        }
 
+       prevpage = set->page_no;
+       prevlatch = set->latch;
+       prevpool = set->pool;
+       prevmode = mode;
+
+       //      if page is being deleted and we should continue right
+
+       if( set->page->kill && set->page->goright ) {
+               page_no = bt_getid (set->page->right);
+               continue;
+       }
+
+       //      otherwise, wait for deleted node to clear
+
+       if( set->page->kill ) {
+               bt_unlockpage(mode, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               page_no = ROOT_page;
+               prevpage = 0;
+               drill = 0xff;
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+       
        //  find key on page at this level
        //  and descend to requested level
 
-       if( slot = bt_findslot (bt, key, len) ) {
+       if( slot = bt_findslot (set, key, len) ) {
          if( drill == lvl )
-               return bt->parent = parent, slot;
+               return slot;
+
+         if( slot > set->page->cnt )
+               return bt->err = BTERR_struct;
 
-         while( slotptr(bt->page, slot)->dead )
-               if( slot++ < bt->page->cnt )
+         while( slotptr(set->page, slot)->dead )
+               if( slot++ < set->page->cnt )
                        continue;
-               else {
-                       page_no = bt_getid(bt->page->right);
-                       parent = 0;
-                       goto slideright;
-               }
+               else
+                       return bt->err = BTERR_struct, 0;
 
-         page_no = bt_getid(slotptr(bt->page, slot)->id);
-         parent = 1;
+         page_no = bt_getid(slotptr(set->page, slot)->id);
          drill--;
+         continue;
        }
 
        //  or slide right into next page
 
-       else {
-               page_no = bt_getid(bt->page->right);
-               parent = 0;
-       }
-
-       //  continue down / right using overlapping locks
-       //  to protect pages being split.
+       page_no = bt_getid(set->page->right);
 
-slideright:
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
   } while( page_no );
 
   // return error on end of right chain
@@ -1542,189 +1568,287 @@ slideright:
   return 0;    // return error
 }
 
-//  remove empty page from the B-tree
-//     by pulling our right node left over ourselves
+// drill down fixing fence values for left sibling tree
 
-//  call with bt->page, etc, set to page's locked parent
-//     returns with page locked.
+//  call with set write locked
+//     return with set unlocked & unpinned.
 
-BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
+BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
 {
-BtLatchSet *rset, *pset, *rpset;
-BtPool *rpool, *ppool, *rppool;
-BtPage rpage, ppage, rppage;
-uid right, parent, rparent;
-BtKey ptr;
-uint idx;
+unsigned char oldfence[256];
+BtPageSet next[1];
+int chk;
+
+  memcpy (oldfence, set->page->fence, 256);
+
+  while( !set->page->kill && set->page->lvl ) {
+       next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
+       next->latch = bt_pinlatch (bt, next->page_no);
+       bt_lockpage (BtLockParent, next->latch);
+       bt_lockpage (BtLockAccess, next->latch);
+       bt_lockpage (BtLockWrite, next->latch);
+       bt_unlockpage (BtLockAccess, next->latch);
+
+       if( next->pool = bt_pinpool (bt, next->page_no) )
+               next->page = bt_page (bt, next->pool, next->page_no);
+       else
+               return bt->err;
 
-       //      cache node's parent page
+       chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
 
-       parent = bt->page_no;
-       ppage = bt->page;
-       ppool = bt->pool;
-       pset = bt->set;
+       if( chk < 0 ) {
+         next->page_no = bt_getid (next->page->right);
+         bt_unlockpage (BtLockWrite, next->latch);
+         bt_unlockpage (BtLockParent, next->latch);
+         bt_unpinlatch (next->latch);
+         bt_unpinpool (next->pool);
+         continue;
+       }
 
-       // lock and map our right page
-       //      it cannot be NULL because of the stopper
-       //      in the last right page
+       if( chk > 0 )
+               return bt->err = BTERR_struct;
 
-       bt_lockpage (BtLockWrite, set);
+       if( bt_fixfences (bt, next, newfence) )
+               return bt->err;
 
-       // if we aren't dead yet
+       break;
+  }
 
-       if( page->act )
-               goto rmergexit;
+  memcpy (set->page->fence, newfence, 256);
 
-       if( right = bt_getid (page->right) )
-         if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
-         else
-               return bt->err;
-       else
-               return bt->err = BTERR_struct;
+  bt_unlockpage (BtLockWrite, set->latch);
+  bt_unlockpage (BtLockParent, set->latch);
+  bt_unpinlatch (set->latch);
+  bt_unpinpool (set->pool);
+  return 0;
+}
 
-       rset = bt_pinlatch (bt, right);
+//     return page to free list
+//     page must be delete & write locked
 
-       //      find our right neighbor
+void bt_freepage (BtDb *bt, BtPageSet *set)
+{
+       //      lock allocation page
 
-       if( ppage->act > 1 ) {
-        for( idx = slot; idx++ < ppage->cnt; )
-         if( !slotptr(ppage, idx)->dead )
-               break;
+       bt_spinwritelock (bt->mgr->latchmgr->lock, 0);
 
-        if( idx > ppage->cnt )
-               return bt->err = BTERR_struct;
+       //      store chain in second right
+       bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
+       set->page->free = 1;
 
-        //  redirect right neighbor in parent to left node
+       // unlock released page
 
-        bt_putid(slotptr(ppage,idx)->id, page_no);
-       }
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
 
-       //      if parent has only our deleted page, e.g. no right neighbor
-       //      prepare to merge parent itself
+       // unlock allocation page
 
-       if( ppage->act == 1 ) {
-         if( rparent = bt_getid (ppage->right) )
-          if( rppool = bt_pinpool (bt, rparent) )
-               rppage = bt_page (bt, rppool, rparent);
-          else
-               return bt->err;
+       bt_spinreleasewrite (bt->mgr->latchmgr->lock, 0);
+}
+
+//     remove the root level by promoting its only child
+//     call with parent and child pages
+
+BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
+{
+uid next = 0;
+
+  do {
+       if( next ) {
+         child->latch = bt_pinlatch (bt, next);
+         bt_lockpage (BtLockDelete, child->latch);
+         bt_lockpage (BtLockWrite, child->latch);
+
+         if( child->pool = bt_pinpool (bt, next) )
+               child->page = bt_page (bt, child->pool, next);
          else
-               return bt->err = BTERR_struct;
+               return bt->err;
+
+         child->page_no = next;
+       }
+
+       memcpy (root->page, child->page, bt->mgr->page_size);
+       next = bt_getid (slotptr(child->page, child->page->cnt)->id);
+       bt_freepage (bt, child);
+  } while( root->page->lvl > 1 && root->page->cnt == 1 );
+
+  bt_unlockpage (BtLockWrite, root->latch);
+  bt_unpinlatch (root->latch);
+  bt_unpinpool (root->pool);
+  return 0;
+}
+
+//  pull right page over ourselves in simple merge
+
+BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
+{
+       //  install ourselves as child page
+       //      and delete ourselves from parent
 
-         rpset = bt_pinlatch (bt, rparent);
-         bt_lockpage (BtLockWrite, rpset);
+       bt_putid (slotptr(parent->page, idx)->id, set->page_no);
+       slotptr(parent->page, slot)->dead = 1;
+       parent->page->act--;
 
-         // find our right neighbor on right parent page
+       //      collapse any empty slots
 
-         for( idx = 0; idx++ < rppage->cnt; )
-               if( !slotptr(rppage, idx)->dead ) {
-                 bt_putid (slotptr(rppage, idx)->id, page_no);
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
                  break;
-               }
 
-         if( idx > rppage->cnt )
-               return bt->err = BTERR_struct;
-       }
+       memcpy (set->page, right->page, bt->mgr->page_size);
+       bt_unlockpage (BtLockParent, right->latch);
 
-       //      now that there are no more pointers to our right node
-       //      we can wait for delete lock on it
+       bt_freepage (bt, right);
 
-       bt_lockpage(BtLockDelete, rset);
-       bt_lockpage(BtLockWrite, rset);
+       //      do we need to remove a btree level?
+       //      (leave the first page of leaves alone)
 
-       // pull contents of right page into our empty page 
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+         if( set->page->lvl )
+               return bt_removeroot (bt, parent, set);
 
-       memcpy (page, rpage, bt->mgr->page_size);
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
+}
 
-       // ready to release right parent lock
-       //      now that we have a new page in place
+//     remove both child and parent from the btree
+//     from the fence position in the parent
+//     call with both pages locked for writing
 
-       if( ppage->act == 1 ) {
-         bt_unlockpage (BtLockWrite, rpset);
-         bt_unpinlatch (rpset);
-         bt_unpinpool (rppool);
-       }
+BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl)
+{
+unsigned char pagefence[256];
+uint idx;
 
-       //      add killed right block to free chain
-       //      lock latch mgr
+       //  pull right sibling over ourselves and unlock
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
+       memcpy (child->page, right->page, bt->mgr->page_size);
 
-       //      store free chain in allocation page second right
+       bt_unlockpage (BtLockWrite, child->latch);
+       bt_unpinlatch (child->latch);
+       bt_unpinpool (child->pool);
 
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+       //  install ourselves into right link of old right page
 
-       // unlock latch mgr and right page
+       bt_putid (right->page->right, child->page_no);
+       right->page->goright = 1;       // tell bt_loadpage to go right to us
+       right->page->kill = 1;
 
-       bt_unlockpage(BtLockDelete, rset);
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       bt_unlockpage (BtLockWrite, right->latch);
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
+       //      remove our slot from our parent
+       //      signal to move right
 
-       // delete our obsolete fence key from our parent
+       parent->page->goright = 1;      // tell bt_loadpage to go right to rparent
+       parent->page->kill = 1;
+       parent->page->act--;
 
-       slotptr(ppage, slot)->dead = 1;
-       ppage->dirty = 1;
+       //      redirect right page pointer in right parent to us
 
-       //      if our parent now empty
-       //      remove it from the tree
+       for( idx = 0; idx++ < rparent->page->cnt; )
+         if( !slotptr(rparent->page, idx)->dead )
+               break;
 
-       if( ppage->act-- == 1 )
-         if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
-               return bt->err;
+       if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
+               return bt->err = BTERR_struct;
 
-rmergexit:
-       bt_unlockpage (BtLockWrite, pset);
-       bt_unpinlatch (pset);
-       bt_unpinpool (ppool);
+       bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
+       bt_unlockpage (BtLockWrite, rparent->latch);
+       bt_unpinlatch (rparent->latch);
+       bt_unpinpool (rparent->pool);
 
-       bt->found = 1;
-       return bt->err = 0;
-}
+       //      free the right page
+
+       bt_lockpage (BtLockDelete, right->latch);
+       bt_lockpage (BtLockWrite, right->latch);
+       bt_freepage (bt, right);
+
+       //      save parent page fence value
 
-//  remove empty page from the B-tree
-//     try merging left first.  If no left
-//     sibling, then merge right.
+       memcpy (pagefence, parent->page->fence, 256);
+       bt_unlockpage (BtLockWrite, parent->latch);
 
-//     call with page loaded and locked,
-//  return with page locked.
+       return bt_removepage (bt, parent, lvl, pagefence);
+}
+
+//     remove page from btree
+//     call with page unlocked
+//     returns with page on free list
 
-BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence)
 {
-unsigned char fencekey[256], postkey[256];
-uint slot, idx, postfence = 0;
-BtLatchSet *lset, *pset;
-BtPool *lpool, *ppool;
-BtPage lpage, ppage;
-uid left, parent;
+BtPageSet parent[1], sibling[1], rparent[1];
+unsigned char newfence[256];
+uint slot, idx;
 BtKey ptr;
 
-       ptr = keyptr(page, page->cnt);
-       memcpy(fencekey, ptr, ptr->len + 1);
-       bt_unlockpage (BtLockWrite, set);
-
        //      load and lock our parent
 
 retry:
-       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
+       if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
-       parent = bt->page_no;
-       ppage = bt->page;
-       ppool = bt->pool;
-       pset = bt->set;
+       //      can we do a simple merge entirely
+       //      between siblings on the parent page?
 
-       //      wait until we are posted in our parent
+       if( slot < parent->page->cnt ) {
+               // find our right neighbor
+               //      right must exist because the stopper prevents
+               //      the rightmost page from deleting
 
-       if( !bt->parent ) {
-               bt_unlockpage (BtLockWrite, pset);
-               bt_unpinlatch (pset);
-               bt_unpinpool (ppool);
-#ifdef unix
+               for( idx = slot; idx++ < parent->page->cnt; )
+                 if( !slotptr(parent->page, idx)->dead )
+                       break;
+
+               sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
+
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+
+               //      merge right if sibling shows up in
+               //  our parent and is not being killed
+
+               if( sibling->page_no == bt_getid (set->page->right) ) {
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
+                 bt_lockpage (BtLockParent, sibling->latch);
+                 bt_lockpage (BtLockDelete, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_mergeright(bt, set, parent, sibling, slot, idx);
+
+                 //  try again later
+
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockParent, sibling->latch);
+                 bt_unlockpage (BtLockDelete, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
+
+               bt_unlockpage (BtLockDelete, set->latch);
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
                sched_yield();
 #else
                SwitchToThread();
@@ -1735,35 +1859,59 @@ retry:
        //  find our left neighbor in our parent page
 
        for( idx = slot; --idx; )
-         if( !slotptr(ppage, idx)->dead )
+         if( !slotptr(parent->page, idx)->dead )
                break;
 
-       //      if no left neighbor, do right merge
+       //      if no left neighbor, delete ourselves and our parent
 
-       if( !idx )
-               return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
+       if( !idx ) {
+               bt_lockpage (BtLockAccess, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockAccess, set->latch);
 
-       // lock and map our left neighbor's page
+               rparent->page_no = bt_getid (parent->page->right);
+               rparent->latch = bt_pinlatch (bt, rparent->page_no);
 
-       left = bt_getid (slotptr(ppage, idx)->id);
+               bt_lockpage (BtLockAccess, rparent->latch);
+               bt_lockpage (BtLockWrite, rparent->latch);
+               bt_unlockpage (BtLockAccess, rparent->latch);
 
-       if( lpool = bt_pinpool (bt, left) )
-               lpage = bt_page (bt, lpool, left);
-       else
-               return bt->err;
+               if( rparent->pool = bt_pinpool (bt, rparent->page_no) )
+                       rparent->page = bt_page (bt, rparent->pool, rparent->page_no);
+               else
+                       return bt->err;
+
+               if( !rparent->page->kill ) {
+                 sibling->page_no = bt_getid (set->page->right);
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
 
-       lset = bt_pinlatch (bt, left);
-       bt_lockpage(BtLockWrite, lset);
+                 bt_lockpage (BtLockAccess, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockAccess, sibling->latch);
 
-       //  wait until sibling is in our parent
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
+
+                 //  try again later
 
-       if( bt_getid (lpage->right) != page_no ) {
-               bt_unlockpage (BtLockWrite, pset);
-               bt_unpinlatch (pset);
-               bt_unpinpool (ppool);
-               bt_unlockpage (BtLockWrite, lset);
-               bt_unpinlatch (lset);
-               bt_unpinpool (lpool);
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
+
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, rparent->latch);
+               bt_unpinlatch (rparent->latch);
+               bt_unpinpool (rparent->pool);
+
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
 #ifdef linux
                sched_yield();
 #else
@@ -1772,114 +1920,97 @@ retry:
                goto retry;
        }
 
-       //  since our page will have no more pointers to it,
-       //      obtain Delete lock and wait for write locks to clear
-
-       bt_lockpage(BtLockDelete, set);
-       bt_lockpage(BtLockWrite, set);
-
-       //      if we aren't dead yet,
-       //      get ready for exit
-
-       if( page->act ) {
-               bt_unlockpage(BtLockDelete, set);
-               bt_unlockpage(BtLockWrite, lset);
-               bt_unpinlatch (lset);
-               bt_unpinpool (lpool);
-               goto lmergexit;
-       }
-
-       //      are we are the fence key for our parent?
-       //      if so, grab our old fence key
-
-       if( postfence = slot == ppage->cnt ) {
-               ptr = keyptr (ppage, ppage->cnt);
-               memcpy(fencekey, ptr, ptr->len + 1);
-               memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
-
-               // clear out other dead slots
-
-               while( --ppage->cnt )
-                 if( slotptr(ppage, ppage->cnt)->dead )
-                       memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
-                 else
-                       break;
+       // redirect parent to our left sibling
+       // lock and map our left sibling's page
 
-               ptr = keyptr (ppage, ppage->cnt);
-               memcpy(postkey, ptr, ptr->len + 1);
-       } else
-               slotptr(ppage,slot)->dead = 1;
+       sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
+       sibling->latch = bt_pinlatch (bt, sibling->page_no);
 
-       ppage->dirty = 1;
-       ppage->act--;
+       //      wait our turn on fence key maintenance
 
-       //      push our right neighbor pointer to our left
+       bt_lockpage(BtLockParent, sibling->latch);
+       bt_lockpage(BtLockAccess, sibling->latch);
+       bt_lockpage(BtLockWrite, sibling->latch);
+       bt_unlockpage(BtLockAccess, sibling->latch);
 
-       memcpy (lpage->right, page->right, BtId);
+       if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+       else
+               return bt->err;
 
-       //      add ourselves to free chain
-       //      lock latch mgr
+       //  wait until left sibling is in our parent
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
+       if( bt_getid (sibling->page->right) != set->page_no ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unlockpage (BtLockWrite, sibling->latch);
+               bt_unlockpage (BtLockParent, sibling->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+               bt_unpinlatch (sibling->latch);
+               bt_unpinpool (sibling->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               goto retry;
+       }
 
-       //      store free chain in allocation page second right
-       bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
+       //      delete our left sibling from parent
 
-       // unlock latch mgr and pages
+       slotptr(parent->page,idx)->dead = 1;
+       parent->page->dirty = 1;
+       parent->page->act--;
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
-       bt_unlockpage(BtLockWrite, lset);
-       bt_unpinlatch (lset);
-       bt_unpinpool (lpool);
+       //      redirect our parent slot to our left sibling
 
-       // release our node's delete lock
+       bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
+       memcpy (sibling->page->right, set->page->right, BtId);
 
-       bt_unlockpage(BtLockDelete, set);
+       //      collapse dead slots from parent
 
-lmergexit:
-       bt_unlockpage (BtLockWrite, pset);
-       bt_unpinpool (ppool);
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
+                 break;
 
-       //  do we need to post parent's fence key in its parent?
+       //  free our original page
 
-       if( !postfence || parent == ROOT_page ) {
-               bt_unpinlatch (pset);
-               bt->found = 1;
-               return bt->err = 0;
-       }
+       bt_lockpage (BtLockDelete, set->latch);
+       bt_lockpage (BtLockWrite, set->latch);
+       bt_freepage (bt, set);
 
-       //      interlock parent fence post
+       //      go down the left node's fence keys to the leaf level
+       //      and update the fence keys in each page
 
-       bt_lockpage (BtLockParent, pset);
+       memcpy (newfence, parent->page->fence, 256);
 
-       //      load parent's parent page
-posttry:
-       if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+       if( bt_fixfences (bt, sibling, newfence) )
                return bt->err;
 
-       if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
-         if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
-               return bt->err;
-         else
-               goto posttry;
+       //  promote sibling as new root?
 
-       page = bt->page;
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+        if( sibling->page->lvl ) {
+         sibling->latch = bt_pinlatch (bt, sibling->page_no);
+         bt_lockpage (BtLockDelete, sibling->latch);
+         bt_lockpage (BtLockWrite, sibling->latch);
 
-       page->min -= *postkey + 1;
-       ((unsigned char *)page)[page->min] = *postkey;
-       memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
-       slotptr(page, slot)->off = page->min;
+         if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+         else
+               return bt->err;
 
-       bt_unlockpage (BtLockParent, pset);
-       bt_unpinlatch (pset);
+         return bt_removeroot (bt, parent, sibling);
+        }
 
-       bt_unlockpage (BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
 
-       bt->found = 1;
-       return bt->err = 0;
+       return 0;
 }
 
 //  find and delete key on page by marking delete flag bit
@@ -1887,65 +2018,79 @@ posttry:
 
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
 {
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
-uid page_no;
+unsigned char pagefence[256];
+uint slot, idx, found;
+BtPageSet set[1];
 BtKey ptr;
-uint slot;
 
-       if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
+               ptr = keyptr(set->page, slot);
+       else
                return bt->err;
 
-       page_no = bt->page_no;
-       page = bt->page;
-       pool = bt->pool;
-       set = bt->set;
-
        // if key is found delete it, otherwise ignore request
 
-       ptr = keyptr(page, slot);
+       if( found = slot <= set->page->cnt )
+         if( found = !keycmp (ptr, key, len) )
+               if( found = slotptr(set->page, slot)->dead == 0 ) {
+                 slotptr(set->page,slot)->dead = 1;
+                 set->page->dirty = 1;
+                 set->page->act--;
 
-       if( bt->found = !keycmp (ptr, key, len) )
-         if( bt->found = slotptr(page, slot)->dead == 0 ) {
-               slotptr(page,slot)->dead = 1;
-                 if( slot < page->cnt )
-                       page->dirty = 1;
-                 if( !--page->act )
-                       if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
-                         return bt->err;
+                 // collapse empty slots
+
+                 while( idx = set->page->cnt - 1 )
+                       if( slotptr(set->page, idx)->dead ) {
+                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
+                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
+                       } else
+                               break;
                }
 
-       bt_unlockpage(BtLockWrite, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
-       return bt->err = 0;
+       if( set->page->act ) {
+               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               return bt->found = found, 0;
+       }
+
+       memcpy (pagefence, set->page->fence, 256);
+       set->page->kill = 1;
+
+       bt_unlockpage (BtLockWrite, set->latch);
+
+       if( bt_removepage (bt, set, 0, pagefence) )
+               return bt->err;
+
+       bt->found = found;
+       return 0;
 }
 
 //     find key in leaf level and return row-id
 
 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint  slot;
 BtKey ptr;
 uid id;
 
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               ptr = keyptr(bt->page, slot);
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+               ptr = keyptr(set->page, slot);
        else
                return 0;
 
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( slot <= bt->page->cnt && !keycmp (ptr, key, len) )
-               id = bt_getid(slotptr(bt->page,slot)->id);
+       if( !keycmp (ptr, key, len) )
+               id = bt_getid(slotptr(set->page,slot)->id);
        else
                id = 0;
 
-       bt_unlockpage (BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage (BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return id;
 }
 
@@ -1956,7 +2101,7 @@ uid id;
 
 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
-uint nxt = bt->mgr->page_size;
+uint nxt = bt->mgr->page_size, off;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
 uint newslot = max;
@@ -1979,28 +2124,31 @@ BtKey key;
        page->act = 0;
 
        // try cleaning up page first
-
-       // always leave fence key in the array
-       // otherwise, remove deleted key
+       // by removing deleted keys
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
-               // copy key
+               // if its not the fence key,
+               // copy the key across
 
-               key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               off = slotptr(bt->frame,cnt)->off;
+
+               if( off >= sizeof(*page) ) {
+                       key = keyptr(bt->frame, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               }
 
                // copy slot
+
                memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
-               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
-                       page->act++;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
+               slotptr(page, idx)->off = off;
+               page->act++;
        }
 
        page->min = nxt;
@@ -2014,202 +2162,192 @@ BtKey key;
        return 0;
 }
 
-//     add key to current page
-//     page must already be writelocked
+// split the root and raise the height of the btree
 
-void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
-{
-uint idx;
-
-       // find next available dead slot and copy key onto page
-
-       for( idx = slot; idx < page->cnt; idx++ )
-         if( slotptr(page, idx)->dead )
-               break;
-
-       if( idx == page->cnt )
-               idx++, page->cnt++;
-
-       page->act++;
-
-       // now insert key into array before slot
-
-       while( idx > slot )
-               *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
-
-       page->min -= len + 1;
-       ((unsigned char *)page)[page->min] = len;
-       memcpy ((unsigned char *)page + page->min +1, key, len );
-
-       bt_putid(slotptr(page,slot)->id, id);
-       slotptr(page, slot)->off = page->min;
-       slotptr(page, slot)->tod = tod;
-       slotptr(page, slot)->dead = 0;
-}
-
-BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
+BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
 {
 uint nxt = bt->mgr->page_size;
-BtPage root = bt->page;
+unsigned char leftkey[256];
 uid new_page;
 
        //  Obtain an empty page to use, and copy the current
-       //  root contents into it
+       //  root contents into it, e.g. lower keys
+
+       memcpy (leftkey, root->page->fence, 256);
+       root->page->posted = 1;
 
-       if( !(new_page = bt_newpage(bt, root)) )
+       if( !(new_page = bt_newpage(bt, root->page)) )
                return bt->err;
 
        // preserve the page info at the bottom
-       // and set rest to zero
+       // of higher keys and set rest to zero
 
-       memset(root+1, 0, bt->mgr->page_size - sizeof(*root));
+       memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
+       memset(root->page->fence, 0, 256);
+       root->page->fence[0] = 2;
+       root->page->fence[1] = 0xff;
+       root->page->fence[2] = 0xff;
 
-       // insert first key on newroot page
+       // insert lower keys page fence key on newroot page
 
        nxt -= *leftkey + 1;
-       memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
-       bt_putid(slotptr(root, 1)->id, new_page);
-       slotptr(root, 1)->off = nxt;
+       memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
+       bt_putid(slotptr(root->page, 1)->id, new_page);
+       slotptr(root->page, 1)->off = nxt;
        
-       // insert second key (stopper key) on newroot page
+       // insert stopper key on newroot page
        // and increase the root height
 
-       nxt -= 3;
-       *((unsigned char *)root + nxt) = 2;
-       memset ((unsigned char *)root + nxt + 1, 0xff, 2);
-       bt_putid(slotptr(root, 2)->id, page_no2);
-       slotptr(root, 2)->off = nxt;
+       bt_putid(slotptr(root->page, 2)->id, page_no2);
+       slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
 
-       bt_putid(root->right, 0);
-       root->min = nxt;                // reset lowest used offset and key count
-       root->cnt = 2;
-       root->act = 2;
-       root->lvl++;
+       bt_putid(root->page->right, 0);
+       root->page->min = nxt;          // reset lowest used offset and key count
+       root->page->cnt = 2;
+       root->page->act = 2;
+       root->page->lvl++;
 
-       // release and unpin root (bt->page)
+       // release and unpin root
 
-       bt_unlockpage(BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage(BtLockWrite, root->latch);
+       bt_unpinlatch (root->latch);
+       bt_unpinpool (root->pool);
        return 0;
 }
 
 //  split already locked full node
-//     return unlocked and unpinned.
+//     return unlocked.
 
-BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
+BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
-uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
-unsigned char rightkey[256], leftkey[256];
-uint tod = time(NULL);
-uint lvl = page->lvl;
-uid new_page;
+uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size, off;
+unsigned char fencekey[256];
+uint lvl = set->page->lvl;
+uid right;
 BtKey key;
 
-       //      initialize frame buffer for right node
+       //  split higher half of keys to bt->frame
 
        memset (bt->frame, 0, bt->mgr->page_size);
-       max = page->cnt;
+       max = set->page->cnt;
        cnt = max / 2;
        idx = 0;
 
-       //  split higher half of keys to bt->frame
-
        while( cnt++ < max ) {
-               key = keyptr(page, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
-               memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
-               if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
-                       bt->frame->act++;
-               slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
-               slotptr(bt->frame, idx)->off = nxt;
+               if( !lvl || cnt < max ) {
+                       key = keyptr(set->page, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
+               } else
+                       off = offsetof(struct BtPage_, fence);
+
+               memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
+               slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
+               slotptr(bt->frame, idx)->off = off;
+               bt->frame->act++;
        }
 
-       // transfer right link node to new right node
-
-       if( page_no > ROOT_page )
-               memcpy (bt->frame->right, page->right, BtId);
+       if( set->page_no == ROOT_page )
+               bt->frame->posted = 1;
 
+       memcpy (bt->frame->fence, set->page->fence, 256);
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
        bt->frame->cnt = idx;
        bt->frame->lvl = lvl;
 
-       //      get new free page and write right frame to it.
+       // link right node
 
-       if( !(new_page = bt_newpage(bt, bt->frame)) )
-               return bt->err;
+       if( set->page_no > ROOT_page )
+               memcpy (bt->frame->right, set->page->right, BtId);
 
-       //      remember fence key for new right page to add
-       //      as right sibling to the left node
+       //      get new free page and write higher keys to it.
 
-       key = keyptr(bt->frame, idx);
-       memcpy (rightkey, key, key->len + 1);
+       if( !(right = bt_newpage(bt, bt->frame)) )
+               return bt->err;
 
        //      update lower keys to continue in old page
 
-       memcpy (bt->frame, page, bt->mgr->page_size);
-       memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
+       memcpy (bt->frame, set->page, bt->mgr->page_size);
+       memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
        nxt = bt->mgr->page_size;
-       page->dirty = 0;
-       page->act = 0;
+       set->page->posted = 0;
+       set->page->dirty = 0;
+       set->page->act = 0;
        cnt = 0;
        idx = 0;
 
        //  assemble page of smaller keys
-       //      to remain in the old page
 
        while( cnt++ < max / 2 ) {
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
-               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
-                       page->act++;
-               slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
-       }
 
-       // finalize left page and save fence key
+               if( !lvl || cnt < max / 2 ) {
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
+               } else 
+                       off = offsetof(struct BtPage_, fence);
 
-       memcpy(leftkey, key, key->len + 1);
-       page->min = nxt;
-       page->cnt = idx;
+               memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
+               slotptr(set->page, idx)->off = off;
+               set->page->act++;
+       }
+
+       // install fence key for smaller key page
 
-       //      link new right page
+       memset(set->page->fence, 0, 256);
+       memcpy(set->page->fence, key, key->len + 1);
 
-       bt_putid (page->right, new_page);
+       bt_putid(set->page->right, right);
+       set->page->min = nxt;
+       set->page->cnt = idx;
 
        // if current page is the root page, split it
 
-       if( page_no == ROOT_page )
-               return bt_splitroot (bt, leftkey, new_page);
+       if( set->page_no == ROOT_page )
+               return bt_splitroot (bt, set, right);
 
-       //  obtain ParentModification lock for current page
+       bt_unlockpage (BtLockWrite, set->latch);
 
-       bt_lockpage (BtLockParent, set);
+       // insert new fences in their parent pages
 
-       //  release wr lock on our page.
-       //      this will keep out another SMO
+       while( 1 ) {
+               bt_lockpage (BtLockParent, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
 
-       bt_unlockpage (BtLockWrite, set);
+               memcpy (fencekey, set->page->fence, 256);
+               right = bt_getid (set->page->right);
 
-       //  insert key for old page (lower keys)
+               if( set->page->posted ) {
+                       bt_unlockpage (BtLockParent, set->latch);
+                       bt_unlockpage (BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
+               }
 
-       if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) )
-               return bt->err;
+               set->page->posted = 1;
+               bt_unlockpage (BtLockWrite, set->latch);
 
-       //      switch old parent key from us to our right page
+               if( bt_insertkey (bt, fencekey+1, *fencekey, set->page_no, time(NULL), lvl+1) )
+                       return bt->err;
 
-       if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) )
-               return bt->err;
+               bt_unlockpage (BtLockParent, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
 
-       //      unlock and unpin
+               if( !(set->page_no = right) )
+                       break;
+
+               set->latch = bt_pinlatch (bt, right);
+
+               if( set->pool = bt_pinpool (bt, right) )
+                       set->page = bt_page (bt, set->pool, right);
+               else
+                       return bt->err;
+       }
 
-       bt_unlockpage (BtLockParent, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
        return 0;
 }
 
@@ -2217,13 +2355,13 @@ BtKey key;
 
 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
 {
+BtPageSet set[1];
 uint slot, idx;
-BtPage page;
 BtKey ptr;
 
        while( 1 ) {
-               if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-                       ptr = keyptr(bt->page, slot);
+               if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
+                       ptr = keyptr(set->page, slot);
                else
                {
                        if ( !bt->err )
@@ -2233,34 +2371,56 @@ BtKey ptr;
 
                // if key already exists, update id and return
 
-               page = bt->page;
-
-               if( !keycmp (ptr, key, len) ) {
-                       if( slotptr(page, slot)->dead )
-                               page->act++;
-                       slotptr(page, slot)->dead = 0;
-                       slotptr(page, slot)->tod = tod;
-                       bt_putid(slotptr(page,slot)->id, id);
-                       bt_unlockpage(BtLockWrite, bt->set);
-                       bt_unpinlatch (bt->set);
-                       bt_unpinpool (bt->pool);
-                       return bt->err;
+               if( slot <= set->page->cnt )
+                 if( !keycmp (ptr, key, len) ) {
+                       if( slotptr(set->page, slot)->dead )
+                               set->page->act++;
+                       slotptr(set->page, slot)->dead = 0;
+                       slotptr(set->page, slot)->tod = tod;
+                       bt_putid(slotptr(set->page,slot)->id, id);
+                       bt_unlockpage(BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
                }
 
                // check if page has enough space
 
-               if( slot = bt_cleanpage (bt, bt->page, len, slot) )
+               if( slot = bt_cleanpage (bt, set->page, len, slot) )
                        break;
 
-               if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+               if( bt_splitpage (bt, set) )
                        return bt->err;
        }
 
-       bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
+       // calculate next available slot and copy key into page
+
+       set->page->min -= len + 1; // reset lowest used offset
+       ((unsigned char *)set->page)[set->page->min] = len;
+       memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
+
+       for( idx = slot; idx <= set->page->cnt; idx++ )
+         if( slotptr(set->page, idx)->dead )
+               break;
+
+       // now insert key into array before slot
+
+       if( idx > set->page->cnt )
+               set->page->cnt++;
+
+       set->page->act++;
 
-       bt_unlockpage (BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       while( idx > slot )
+               *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+
+       bt_putid(slotptr(set->page,slot)->id, id);
+       slotptr(set->page, slot)->off = set->page->min;
+       slotptr(set->page, slot)->tod = tod;
+       slotptr(set->page, slot)->dead = 0;
+
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return 0;
 }
 
@@ -2268,17 +2428,21 @@ BtKey ptr;
 
 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint slot;
 
        // cache page for retrieval
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               memcpy (bt->cursor, bt->page, bt->mgr->page_size);
 
-       bt->cursor_page = bt->page_no;
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+         memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       else
+         return 0;
+
+       bt->cursor_page = set->page_no;
 
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return slot;
 }
 
@@ -2287,9 +2451,7 @@ uint slot;
 
 uint bt_nextkey (BtDb *bt, uint slot)
 {
-BtLatchSet *set;
-BtPool *pool;
-BtPage page;
+BtPageSet set[1];
 uid right;
 
   do {
@@ -2297,7 +2459,7 @@ uid right;
        while( slot++ < bt->cursor->cnt )
          if( slotptr(bt->cursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt) )
+         else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
                return slot;
          else
                break;
@@ -2306,19 +2468,20 @@ uid right;
                break;
 
        bt->cursor_page = right;
-       if( pool = bt_pinpool (bt, right) )
-               page = bt_page (bt, pool, right);
+
+       if( set->pool = bt_pinpool (bt, right) )
+               set->page = bt_page (bt, set->pool, right);
        else
                return 0;
 
-       set = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, set);
+       set->latch = bt_pinlatch (bt, right);
+    bt_lockpage(BtLockRead, set->latch);
 
-       memcpy (bt->cursor, page, bt->mgr->page_size);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        slot = 0;
   } while( 1 );
 
@@ -2346,45 +2509,56 @@ uint bt_tod(BtDb *bt, uint slot)
 void bt_latchaudit (BtDb *bt)
 {
 ushort idx, hashidx;
-BtLatchSet *set;
+BtLatchSet *latch;
 BtPool *pool;
 BtPage page;
 uid page_no;
 
 #ifdef unix
        for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
-               set = bt->mgr->latchsets + idx;
-               if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
-                       fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
-                       *(ushort *)set->readwr = 0;
-                       *(ushort *)set->access = 0;
-                       *(ushort *)set->parent = 0;
+               latch = bt->mgr->latchsets + idx;
+               if( *(uint *)latch->readwr ) {
+                       fprintf(stderr, "latchset %d r/w locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->readwr = 0;
+               }
+               if( *(uint *)latch->access ) {
+                       fprintf(stderr, "latchset %d access locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->access = 0;
+               }
+               if( *(uint *)latch->parent ) {
+                       fprintf(stderr, "latchset %d parent locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->parent = 0;
                }
-               if( set->pin ) {
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-                       set->pin = 0;
+               if( *(uint *)latch->busy ) {
+                       fprintf(stderr, "latchset %d busy locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->parent = 0;
+               }
+               if( latch->pin ) {
+                       fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
+                       latch->pin = 0;
                }
        }
 
        for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
-         if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
-               fprintf(stderr, "latchmgr locked\n");
          if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
-               set = bt->mgr->latchsets + idx;
-               if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
-                       fprintf(stderr, "latchset %d locked\n", idx);
-               if( set->hash != hashidx )
+               latch = bt->mgr->latchsets + idx;
+               if( latch->hash != hashidx ) {
                        fprintf(stderr, "latchset %d wrong hashidx\n", idx);
-               if( set->pin )
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-         } while( idx = set->next );
+                       latch->hash = hashidx;
+               }
+         } while( idx = latch->next );
        }
+
        page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
 
        while( page_no ) {
                fprintf(stderr, "free: %.6x\n", (uint)page_no);
-               pool = bt_pinpool (bt, page_no);
-               page = bt_page (bt, pool, page_no);
+
+               if( pool = bt_pinpool (bt, page_no) )
+                       page = bt_page (bt, pool, page_no);
+               else
+                       return;
+
            page_no = bt_getid(page->right);
                bt_unpinpool (pool);
        }
@@ -2412,10 +2586,8 @@ uid next, page_no = LEAF_page;   // start on first page of leaves
 unsigned char key[256];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
-BtLatchSet *set;
+BtPageSet set[1];
 time_t tod[1];
-BtPool *pool;
-BtPage page;
 BtKey ptr;
 BtDb *bt;
 FILE *in;
@@ -2501,38 +2673,49 @@ FILE *in;
                break;
 
        case 's':
-               len = key[0] = 0;
-
-               fprintf(stderr, "started reading\n");
-
-               if( slot = bt_startkey (bt, key, len) )
-                 slot--;
-               else
-                 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
-
-               while( slot = bt_nextkey (bt, slot) ) {
-                       ptr = bt_key(bt, slot);
-                       fwrite (ptr->key, ptr->len, 1, stdout);
-                       fputc ('\n', stdout);
-               }
+               fprintf(stderr, "started scanning\n");
+               do {
+                       if( set->pool = bt_pinpool (bt, page_no) )
+                               set->page = bt_page (bt, set->pool, page_no);
+                       else
+                               break;
+                       set->latch = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set->latch);
+                       next = bt_getid (set->page->right);
+                       cnt += set->page->act;
+
+                       for( slot = 0; slot++ < set->page->cnt; )
+                        if( next || slot < set->page->cnt )
+                         if( !slotptr(set->page, slot)->dead ) {
+                               ptr = keyptr(set->page, slot);
+                               fwrite (ptr->key, ptr->len, 1, stdout);
+                               fputc ('\n', stdout);
+                         }
+
+                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+               } while( page_no = next );
 
+               cnt--;  // remove stopper key
+               fprintf(stderr, " Total keys read %d\n", cnt);
                break;
 
        case 'c':
-               fprintf(stderr, "started reading\n");
+               fprintf(stderr, "started counting\n");
 
                do {
-                       if( pool = bt_pinpool (bt, page_no) )
-                               page = bt_page (bt, pool, page_no);
+                       if( set->pool = bt_pinpool (bt, page_no) )
+                               set->page = bt_page (bt, set->pool, page_no);
                        else
                                break;
-                       set = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, set);
-                       cnt += page->act;
-                       next = bt_getid (page->right);
-                       bt_unlockpage (BtLockRead, set);
-                       bt_unpinlatch (set);
-                       bt_unpinpool (pool);
+                       set->latch = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set->latch);
+                       cnt += set->page->act;
+                       next = bt_getid (set->page->right);
+                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
                } while( page_no = next );
 
                cnt--;  // remove stopper key