]> pd.if.org Git - btree/blobdiff - threads2j.c
fix original problem with bt_deletekey, obviating need for btree2t.c
[btree] / threads2j.c
index e5baf7b8f452967c81357c444c1af98d755c6f49..e2faef4f45d64020b1ec629fe66cdaa15913be28 100644 (file)
@@ -1,5 +1,6 @@
 // btree version threads2j linux futex concurrency version
-// 24 JAN 2014
+//     with reworked bt_deletekey
+// 12 FEB 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -52,6 +53,7 @@ REDISTRIBUTION OF THIS SOFTWARE.
 
 #include <memory.h>
 #include <string.h>
+#include <stddef.h>
 
 typedef unsigned long long     uid;
 
@@ -149,16 +151,19 @@ typedef struct {
 //     It is immediately followed
 //     by the BtSlot array of keys.
 
-typedef struct Page {
+typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
-       unsigned char bits;                     // page size in bits
-       unsigned char lvl:6;            // level of page
+       unsigned char bits:7;           // page size in bits
+       unsigned char free:1;           // page is on free list
+       unsigned char lvl:4;            // level of page
        unsigned char kill:1;           // page is being deleted
        unsigned char dirty:1;          // page has deleted keys
+       unsigned char posted:1;         // page fence has posted
+       unsigned char goright:1;        // page is being killed, continue to right
        unsigned char right[BtId];      // page number to right
-       BtSlot table[0];                        // array of key slots
+       unsigned char fence[256];       // page fence key
 } *BtPage;
 
 //  hash table entries
@@ -197,10 +202,19 @@ typedef struct {
 #endif
 } BtPool;
 
+//  The loadpage interface object
+
+typedef struct {
+       uid page_no;            // current page number
+       BtPage page;            // current page pointer
+       BtPool *pool;           // current page pool
+       BtLatchSet *latch;      // current page latch set
+} BtPageSet;
+
 //     structure for latch manager on ALLOC_page
 
 typedef struct {
-       struct Page alloc[2];           // next & free page_nos in right ptr
+       struct BtPage_ alloc[2];        // next & free page_nos in right ptr
        BtLatch lock[1];                        // allocation area lite latch
        ushort latchdeployed;           // highest number of latch entries deployed
        ushort nlatchpage;                      // number of latch pages at BT_latch
@@ -218,7 +232,6 @@ typedef struct {
        uint seg_bits;                          // seg size in pages in bits
        uint mode;                                      // read-write mode
 #ifdef unix
-       char *pooladvise;                       // bit maps for pool page advisements
        int idx;
 #else
        HANDLE idx;
@@ -243,11 +256,7 @@ typedef struct {
        BtPage cursor;          // cached frame for start/next (never mapped)
        BtPage frame;           // spare frame for the page split (never mapped)
        BtPage zero;            // page of zeroes to extend the file (never mapped)
-       BtPage page;            // current page mapped from file
-       uid page_no;            // current page number  
        uid cursor_page;        // current cursor page number   
-       BtLatchSet *set;        // current page latchset
-       BtPool *pool;           // current page pool
        unsigned char *mem;     // frame, cursor, page memory buffer
        int found;                      // last delete or insert was found
        int err;                        // last error
@@ -266,12 +275,15 @@ typedef enum {
 // B-Tree functions
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr);
-extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
-extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
+extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
+extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
+//     internal functions
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence);
+
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
 void bt_mgrclose (BtMgr *mgr);
@@ -316,8 +328,8 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //     one with two keys.
 
 //     Deleted keys are marked with a dead bit until
-//     page cleanup The fence key for a node is always
-//     present, even after deletion and cleanup.
+//     page cleanup. The fence key for a node is
+//     present in a special array.
 
 //  Groups of pages called segments from the btree are optionally
 //  cached with a memory mapped pool. A hash table is used to keep
@@ -332,14 +344,15 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //  Page 0 is dedicated to lock for new page extensions,
 //     and chains empty pages together for reuse.
 
-//     The ParentModification lock on a node is obtained to prevent resplitting
-//     or deleting a node before its fence is posted into its upper level.
+//     The ParentModification lock on a node is obtained to serialize posting
+//     or changing the fence key for a node.
 
 //     Empty pages are chained together through the ALLOC page and reused.
 
 //     Access macros to address slot and key values from the page
+//     Page slots use 1 based indexing.
 
-#define slotptr(page, slot) (page->table + slot-1)
+#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
 
 void bt_putid(unsigned char *dest, uid id)
@@ -728,7 +741,6 @@ uint slot;
        free (mgr->pool);
        free (mgr->hash);
        free (mgr->latch);
-       free (mgr->pooladvise);
        free (mgr);
 #else
        FlushFileBuffers(mgr->idx);
@@ -859,7 +871,6 @@ SYSTEM_INFO sysinfo[1];
        mgr->pool = calloc (poolmax, sizeof(BtPool));
        mgr->hash = calloc (hashsize, sizeof(ushort));
        mgr->latch = calloc (hashsize, sizeof(BtLatch));
-       mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
 #else
        mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
        mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
@@ -906,13 +917,12 @@ SYSTEM_INFO sysinfo[1];
        latchmgr->alloc->bits = mgr->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
-               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3;
+               slotptr(latchmgr->alloc, 1)->off = offsetof(struct BtPage_, fence);
                bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
-               key = keyptr(latchmgr->alloc, 1);
-               key->len = 2;                   // create stopper key
-               key->key[0] = 0xff;
-               key->key[1] = 0xff;
-               latchmgr->alloc->min = mgr->page_size - 3;
+               latchmgr->alloc->fence[0] = 2;                  // create stopper key
+               latchmgr->alloc->fence[1] = 0xff;
+               latchmgr->alloc->fence[2] = 0xff;
+               latchmgr->alloc->min = mgr->page_size;
                latchmgr->alloc->lvl = lvl;
                latchmgr->alloc->cnt = 1;
                latchmgr->alloc->act = 1;
@@ -1105,12 +1115,10 @@ int flag;
 
 #ifdef unix
        flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
-       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
+       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED | MAP_POPULATE, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
 
-       // clear out madvise issued bits
-       memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
 #else
        flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
        pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
@@ -1133,17 +1141,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
-       {
-       uint idx = subpage / 8;
-       uint bit = subpage % 8;
-
-               if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
-                 madvise (page, bt->mgr->page_size, MADV_WILLNEED);
-                 (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
-               }
-       }
-#endif
        return page;
 }
 
@@ -1351,10 +1348,8 @@ void bt_unlockpage(BtLock mode, BtLatchSet *set)
 
 uid bt_newpage(BtDb *bt, BtPage page)
 {
-BtLatchSet *set;
-BtPool *pool;
+BtPageSet set[1];
 uid new_page;
-BtPage pmap;
 int reuse;
 
        //      lock allocation page
@@ -1365,12 +1360,13 @@ int reuse;
        // else allocate empty page
 
        if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
-               if( pool = bt_pinpool (bt, new_page) )
-                       pmap = bt_page (bt, pool, new_page);
+               if( set->pool = bt_pinpool (bt, new_page) )
+                       set->page = bt_page (bt, set->pool, new_page);
                else
                        return 0;
-               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(pmap->right));
-               bt_unpinpool (pool);
+
+               bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
+               bt_unpinpool (set->pool);
                reuse = 1;
        } else {
                new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
@@ -1386,7 +1382,6 @@ int reuse;
        if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
        {
                // use zero buffer to write zeros
-               memset(bt->zero, 0, bt->mgr->page_size);
                if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
                        return bt->err = BTERR_wrt, 0;
        }
@@ -1394,13 +1389,13 @@ int reuse;
        //      bring new page into pool and copy page.
        //      this will extend the file into the new pages.
 
-       if( pool = bt_pinpool (bt, new_page) )
-               pmap = bt_page (bt, pool, new_page);
+       if( set->pool = bt_pinpool (bt, new_page) )
+               set->page = bt_page (bt, set->pool, new_page);
        else
                return 0;
 
-       memcpy(pmap, page, bt->mgr->page_size);
-       bt_unpinpool (pool);
+       memcpy(set->page, page, bt->mgr->page_size);
+       bt_unpinpool (set->pool);
 #endif
        // unlock allocation latch and return new page no
 
@@ -1410,133 +1405,161 @@ int reuse;
 
 //  find slot in page for given key at a given level
 
-int bt_findslot (BtDb *bt, unsigned char *key, uint len)
+int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
 {
-uint diff, higher = bt->page->cnt, low = 1, slot;
-uint good = 0;
+uint diff, higher = set->page->cnt, low = 1, slot;
 
-       //      make stopper key an infinite fence value
+       //        make stopper key an infinite fence value
 
-       if( bt_getid (bt->page->right) )
+       if( bt_getid (set->page->right) )
                higher++;
-       else
-               good++;
 
-       //      low is the next candidate, higher is already
-       //      tested as .ge. the given key, loop ends when they meet
+       //      low is the lowest candidate.
+       //  loop ends when they meet
+
+       //  higher is already
+       //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
-               if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
+               if( keycmp (keyptr(set->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot, good++;
+                       higher = slot;
        }
 
+       if( higher <= set->page->cnt )
+               return higher;
+
+       //      if leaf page, compare against fence value
+
        //      return zero if key is on right link page
+       //      or return slot beyond last key
+
+       if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
+               return 0;
 
-       return good ? higher : 0;
+       return higher;
 }
 
 //  find and load page at given level for given key
 //     leave page rd or wr locked as requested
 
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
+int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
 {
 uid page_no = ROOT_page, prevpage = 0;
-BtLatchSet *set, *prevset;
 uint drill = 0xff, slot;
+BtLatchSet *prevlatch;
 uint mode, prevmode;
 BtPool *prevpool;
 
   //  start at root of btree and drill down
 
-  bt->set = NULL;
-
   do {
        // determine lock mode of drill level
        mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
 
-       bt->set = bt_pinlatch (bt, page_no);
-       bt->page_no = page_no;
+       set->latch = bt_pinlatch (bt, page_no);
+       set->page_no = page_no;
 
        // pin page contents
 
-       if( bt->pool = bt_pinpool (bt, page_no) )
-               bt->page = bt_page (bt, bt->pool, page_no);
+       if( set->pool = bt_pinpool (bt, page_no) )
+               set->page = bt_page (bt, set->pool, page_no);
        else
                return 0;
 
        // obtain access lock using lock chaining with Access mode
 
        if( page_no > ROOT_page )
-         bt_lockpage(BtLockAccess, bt->set);
+         bt_lockpage(BtLockAccess, set->latch);
 
        //      release & unpin parent page
 
        if( prevpage ) {
-         bt_unlockpage(prevmode, prevset);
-         bt_unpinlatch (prevset);
+         bt_unlockpage(prevmode, prevlatch);
+         bt_unpinlatch (prevlatch);
          bt_unpinpool (prevpool);
          prevpage = 0;
        }
 
        // obtain read lock using lock chaining
 
-       bt_lockpage(mode, bt->set);
+       bt_lockpage(mode, set->latch);
 
        if( page_no > ROOT_page )
-         bt_unlockpage(BtLockAccess, bt->set);
+         bt_unlockpage(BtLockAccess, set->latch);
 
        // re-read and re-lock root after determining actual level of root
 
-       if( bt->page->lvl != drill) {
-               if ( bt->page_no != ROOT_page )
+       if( set->page->lvl != drill) {
+               if ( set->page_no != ROOT_page )
                        return bt->err = BTERR_struct, 0;
                        
-               drill = bt->page->lvl;
+               drill = set->page->lvl;
 
                if( lock == BtLockWrite && drill == lvl ) {
-                 bt_unlockpage(mode, bt->set);
-                 bt_unpinlatch (bt->set);
-                 bt_unpinpool (bt->pool);
+                 bt_unlockpage(mode, set->latch);
+                 bt_unpinlatch (set->latch);
+                 bt_unpinpool (set->pool);
                  continue;
                }
        }
 
+       prevpage = set->page_no;
+       prevlatch = set->latch;
+       prevpool = set->pool;
+       prevmode = mode;
+
+       //      if page is being deleted and we should continue right
+
+       if( set->page->kill && set->page->goright ) {
+               page_no = bt_getid (set->page->right);
+               continue;
+       }
+
+       //      otherwise, wait for deleted node to clear
+
+       if( set->page->kill ) {
+               bt_unlockpage(mode, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               page_no = ROOT_page;
+               prevpage = 0;
+               drill = 0xff;
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+       
        //  find key on page at this level
        //  and descend to requested level
 
-       if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) {
+       if( slot = bt_findslot (set, key, len) ) {
          if( drill == lvl )
                return slot;
 
-         while( slotptr(bt->page, slot)->dead )
-               if( slot++ < bt->page->cnt )
+         if( slot > set->page->cnt )
+               return bt->err = BTERR_struct;
+
+         while( slotptr(set->page, slot)->dead )
+               if( slot++ < set->page->cnt )
                        continue;
-               else {
-                       page_no = bt_getid(bt->page->right);
-                       goto slideright;
-               }
+               else
+                       return bt->err = BTERR_struct, 0;
 
-         page_no = bt_getid(slotptr(bt->page, slot)->id);
+         page_no = bt_getid(slotptr(set->page, slot)->id);
          drill--;
+         continue;
        }
 
        //  or slide right into next page
-       //  (slide left from deleted page)
-
-       else
-               page_no = bt_getid(bt->page->right);
 
-       //  continue down / right using overlapping locks
-       //  to protect pages being killed or split.
+       page_no = bt_getid(set->page->right);
 
-slideright:
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
   } while( page_no );
 
   // return error on end of right chain
@@ -1545,118 +1568,516 @@ slideright:
   return 0;    // return error
 }
 
-//  find and delete key on page by marking delete flag bit
-//  when page becomes empty, delete it
+// drill down fixing fence values for left sibling tree
 
-BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
-{
-unsigned char lowerkey[256], higherkey[256];
-BtLatchSet *rset, *set;
-BtPool *pool, *rpool;
-uid page_no, right;
-uint slot, tod;
-BtPage rpage;
-BtKey ptr;
+//  call with set write locked
+//     return with set unlocked & unpinned.
 
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
+BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
+{
+unsigned char oldfence[256];
+BtPageSet next[1];
+int chk;
+
+  memcpy (oldfence, set->page->fence, 256);
+  next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
+
+  while( !set->page->kill && set->page->lvl ) {
+       next->latch = bt_pinlatch (bt, next->page_no);
+       bt_lockpage (BtLockParent, next->latch);
+       bt_lockpage (BtLockAccess, next->latch);
+       bt_lockpage (BtLockWrite, next->latch);
+       bt_unlockpage (BtLockAccess, next->latch);
+
+       if( next->pool = bt_pinpool (bt, next->page_no) )
+               next->page = bt_page (bt, next->pool, next->page_no);
        else
                return bt->err;
 
-       // if key is found delete it, otherwise ignore request
+       chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
 
-       if( bt->found = !keycmp (ptr, key, len) )
-               if( bt->found = slotptr(bt->page, slot)->dead == 0 ) {
-                       slotptr(bt->page,slot)->dead = 1;
-                       if( slot < bt->page->cnt )
-                               bt->page->dirty = 1;
-                       bt->page->act--;
-               }
+       if( chk < 0 ) {
+         next->page_no = bt_getid (next->page->right);
+         bt_unlockpage (BtLockWrite, next->latch);
+         bt_unlockpage (BtLockParent, next->latch);
+         bt_unpinlatch (next->latch);
+         bt_unpinpool (next->pool);
+         continue;
+       }
 
-       // return if page is not empty, or it has no right sibling
+       if( chk > 0 )
+               return bt->err = BTERR_struct;
 
-       right = bt_getid(bt->page->right);
-       page_no = bt->page_no;
-       pool = bt->pool;
-       set = bt->set;
+       if( bt_fixfences (bt, next, newfence) )
+               return bt->err;
 
-       if( !right || bt->page->act ) {
-               bt_unlockpage(BtLockWrite, set);
-               bt_unpinlatch (set);
-               bt_unpinpool (pool);
+       break;
+  }
+
+  memcpy (set->page->fence, newfence, 256);
+
+  bt_unlockpage (BtLockWrite, set->latch);
+  bt_unlockpage (BtLockParent, set->latch);
+  bt_unpinlatch (set->latch);
+  bt_unpinpool (set->pool);
+  return 0;
+}
+
+//     return page to free list
+//     page must be delete & write locked
+
+void bt_freepage (BtDb *bt, BtPageSet *set)
+{
+       //      lock allocation page
+
+       bt_spinwritelock (bt->mgr->latchmgr->lock, 0);
+
+       //      store chain in second right
+       bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+       bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
+       set->page->free = 1;
+
+       // unlock released page
+
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+
+       // unlock allocation page
+
+       bt_spinreleasewrite (bt->mgr->latchmgr->lock, 0);
+}
+
+//     remove the root level by promoting its only child
+//     call with parent and child pages
+
+BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
+{
+uid next = 0;
+
+  do {
+       if( next ) {
+         child->latch = bt_pinlatch (bt, next);
+         bt_lockpage (BtLockDelete, child->latch);
+         bt_lockpage (BtLockWrite, child->latch);
+
+         if( child->pool = bt_pinpool (bt, next) )
+               child->page = bt_page (bt, child->pool, next);
+         else
                return bt->err;
+
+         child->page_no = next;
+       }
+
+       memcpy (root->page, child->page, bt->mgr->page_size);
+       next = bt_getid (slotptr(child->page, child->page->cnt)->id);
+       bt_freepage (bt, child);
+  } while( root->page->lvl > 1 && root->page->cnt == 1 );
+
+  bt_unlockpage (BtLockWrite, root->latch);
+  bt_unpinlatch (root->latch);
+  bt_unpinpool (root->pool);
+  return 0;
+}
+
+//  pull right page over ourselves in simple merge
+
+BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
+{
+       //  install ourselves as child page
+       //      and delete ourselves from parent
+
+       bt_putid (slotptr(parent->page, idx)->id, set->page_no);
+       slotptr(parent->page, slot)->dead = 1;
+       parent->page->act--;
+
+       //      collapse any empty slots
+
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
+                 break;
+
+       memcpy (set->page, right->page, bt->mgr->page_size);
+       bt_unlockpage (BtLockParent, right->latch);
+
+       bt_freepage (bt, right);
+
+       //      do we need to remove a btree level?
+       //      (leave the first page of leaves alone)
+
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+         if( set->page->lvl )
+               return bt_removeroot (bt, parent, set);
+
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
+}
+
+//     remove both child and parent from the btree
+//     from the fence position in the parent
+//     call with both pages locked for writing
+
+BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl)
+{
+unsigned char pagefence[256];
+uint idx;
+
+       //  pull right sibling over ourselves and unlock
+
+       memcpy (child->page, right->page, bt->mgr->page_size);
+
+       bt_unlockpage (BtLockWrite, child->latch);
+       bt_unpinlatch (child->latch);
+       bt_unpinpool (child->pool);
+
+       //  install ourselves into right link of old right page
+
+       bt_putid (right->page->right, child->page_no);
+       right->page->goright = 1;       // tell bt_loadpage to go right to us
+       right->page->kill = 1;
+
+       bt_unlockpage (BtLockWrite, right->latch);
+
+       //      remove our slot from our parent
+       //      signal to move right
+
+       parent->page->goright = 1;      // tell bt_loadpage to go right to rparent
+       parent->page->kill = 1;
+       parent->page->act--;
+
+       //      redirect right page pointer in right parent to us
+
+       for( idx = 0; idx++ < rparent->page->cnt; )
+         if( !slotptr(rparent->page, idx)->dead )
+               break;
+
+       if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
+               return bt->err = BTERR_struct;
+
+       bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
+       bt_unlockpage (BtLockWrite, rparent->latch);
+       bt_unpinlatch (rparent->latch);
+       bt_unpinpool (rparent->pool);
+
+       //      free the right page
+
+       bt_lockpage (BtLockDelete, right->latch);
+       bt_lockpage (BtLockWrite, right->latch);
+       bt_freepage (bt, right);
+
+       //      save parent page fence value
+
+       memcpy (pagefence, parent->page->fence, 256);
+       bt_unlockpage (BtLockWrite, parent->latch);
+
+       return bt_removepage (bt, parent, lvl, pagefence);
+}
+
+//     remove page from btree
+//     call with page unlocked
+//     returns with page on free list
+
+BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence)
+{
+BtPageSet parent[1], sibling[1], rparent[1];
+unsigned char newfence[256];
+uint slot, idx;
+BtKey ptr;
+
+       //      load and lock our parent
+
+  while( 1 ) {
+       if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
+               return bt->err;
+
+       //      do we show up in our parent yet?
+
+       if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+
+       //      can we do a simple merge entirely
+       //      between siblings on the parent page?
+
+       if( slot < parent->page->cnt ) {
+               // find our right neighbor
+               //      right must exist because the stopper prevents
+               //      the rightmost page from deleting
+
+               for( idx = slot; idx++ < parent->page->cnt; )
+                 if( !slotptr(parent->page, idx)->dead )
+                       break;
+
+               sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
+
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+
+               //      merge right if sibling shows up in
+               //  our parent and is not being killed
+
+               if( sibling->page_no == bt_getid (set->page->right) ) {
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
+                 bt_lockpage (BtLockParent, sibling->latch);
+                 bt_lockpage (BtLockDelete, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_mergeright(bt, set, parent, sibling, slot, idx);
+
+                 //  try again later
+
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockParent, sibling->latch);
+                 bt_unlockpage (BtLockDelete, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
+
+               bt_unlockpage (BtLockDelete, set->latch);
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
        }
 
-       // obtain Parent lock over write lock
+       //  find our left neighbor in our parent page
+
+       for( idx = slot; --idx; )
+         if( !slotptr(parent->page, idx)->dead )
+               break;
+
+       //      if no left neighbor, delete ourselves and our parent
+
+       if( !idx ) {
+               bt_lockpage (BtLockAccess, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockAccess, set->latch);
+
+               rparent->page_no = bt_getid (parent->page->right);
+               rparent->latch = bt_pinlatch (bt, rparent->page_no);
+
+               bt_lockpage (BtLockAccess, rparent->latch);
+               bt_lockpage (BtLockWrite, rparent->latch);
+               bt_unlockpage (BtLockAccess, rparent->latch);
+
+               if( rparent->pool = bt_pinpool (bt, rparent->page_no) )
+                       rparent->page = bt_page (bt, rparent->pool, rparent->page_no);
+               else
+                       return bt->err;
+
+               if( !rparent->page->kill ) {
+                 sibling->page_no = bt_getid (set->page->right);
+                 sibling->latch = bt_pinlatch (bt, sibling->page_no);
+
+                 bt_lockpage (BtLockAccess, sibling->latch);
+                 bt_lockpage (BtLockWrite, sibling->latch);
+                 bt_unlockpage (BtLockAccess, sibling->latch);
+
+                 if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+                       sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+                 else
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
+
+                 //  try again later
+
+                 bt_unlockpage (BtLockWrite, sibling->latch);
+                 bt_unpinlatch (sibling->latch);
+                 bt_unpinpool (sibling->pool);
+               }
+
+               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (BtLockWrite, rparent->latch);
+               bt_unpinlatch (rparent->latch);
+               bt_unpinpool (rparent->pool);
+
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
 
-       bt_lockpage(BtLockParent, set);
+       // redirect parent to our left sibling
+       // lock and map our left sibling's page
 
-       // keep copy of key to delete
+       sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
+       sibling->latch = bt_pinlatch (bt, sibling->page_no);
 
-       ptr = keyptr(bt->page, bt->page->cnt);
-       memcpy(lowerkey, ptr, ptr->len + 1);
+       //      wait our turn on fence key maintenance
 
-       // lock and map right page
+       bt_lockpage(BtLockParent, sibling->latch);
+       bt_lockpage(BtLockAccess, sibling->latch);
+       bt_lockpage(BtLockWrite, sibling->latch);
+       bt_unlockpage(BtLockAccess, sibling->latch);
 
-       if( rpool = bt_pinpool (bt, right) )
-               rpage = bt_page (bt, rpool, right);
+       if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
        else
                return bt->err;
 
-       rset = bt_pinlatch (bt, right);
-       bt_lockpage(BtLockWrite, rset);
+       //  wait until left sibling is in our parent
 
-       // pull contents of next page into current empty page 
+       if( bt_getid (sibling->page->right) != set->page_no ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unlockpage (BtLockWrite, sibling->latch);
+               bt_unlockpage (BtLockParent, sibling->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+               bt_unpinlatch (sibling->latch);
+               bt_unpinpool (sibling->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+
+       //      delete our left sibling from parent
+
+       slotptr(parent->page,idx)->dead = 1;
+       parent->page->dirty = 1;
+       parent->page->act--;
+
+       //      redirect our parent slot to our left sibling
 
-       memcpy (bt->page, rpage, bt->mgr->page_size);
+       bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
+       memcpy (sibling->page->right, set->page->right, BtId);
 
-       //      keep copy of key to update
+       //      collapse dead slots from parent
 
-       ptr = keyptr(rpage, rpage->cnt);
-       memcpy(higherkey, ptr, ptr->len + 1);
+       while( idx = parent->page->cnt - 1 )
+         if( slotptr(parent->page, idx)->dead ) {
+               *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
+               memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
+         } else
+                 break;
 
-       //  Mark right page as deleted and point it to left page
-       //      until we can post updates at higher level.
+       //  free our original page
 
-       bt_putid(rpage->right, page_no);
-       rpage->kill = 1;
-       rpage->cnt = 0;
+       bt_lockpage (BtLockDelete, set->latch);
+       bt_lockpage (BtLockWrite, set->latch);
+       bt_freepage (bt, set);
 
-       bt_unlockpage(BtLockWrite, rset);
-       bt_unlockpage(BtLockWrite, set);
+       //      go down the left node's fence keys to the leaf level
+       //      and update the fence keys in each page
 
-       //  delete old lower key to consolidated node
+       memcpy (newfence, parent->page->fence, 256);
 
-       if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
+       if( bt_fixfences (bt, sibling, newfence) )
                return bt->err;
 
-       //  redirect higher key directly to consolidated node
+       //  promote sibling as new root?
 
-       tod = (uint)time(NULL);
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+        if( sibling->page->lvl ) {
+         sibling->latch = bt_pinlatch (bt, sibling->page_no);
+         bt_lockpage (BtLockDelete, sibling->latch);
+         bt_lockpage (BtLockWrite, sibling->latch);
 
-       if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) )
+         if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
+               sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
+         else
                return bt->err;
 
-       //      add killed right block to free chain
-       //      lock latch mgr
+         return bt_removeroot (bt, parent, sibling);
+        }
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
+       bt_unlockpage (BtLockWrite, parent->latch);
+       bt_unpinlatch (parent->latch);
+       bt_unpinpool (parent->pool);
+
+       return 0;
+  }
+}
 
-       //      store free chain in allocation page second right
-       bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
-       bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+//  find and delete key on page by marking delete flag bit
+//  if page becomes empty, delete it from the btree
+
+BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+{
+unsigned char pagefence[256];
+uint slot, idx, found;
+BtPageSet set[1];
+BtKey ptr;
 
-       // unlock latch mgr and unpin right page
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
+               ptr = keyptr(set->page, slot);
+       else
+               return bt->err;
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
-       bt_unpinlatch (rset);
-       bt_unpinpool (rpool);
+       // if key is found delete it, otherwise ignore request
+
+       if( found = slot <= set->page->cnt )
+         if( found = !keycmp (ptr, key, len) )
+               if( found = slotptr(set->page, slot)->dead == 0 ) {
+                 slotptr(set->page,slot)->dead = 1;
+                 set->page->dirty = 1;
+                 set->page->act--;
+
+                 // collapse empty slots
+
+                 while( idx = set->page->cnt - 1 )
+                       if( slotptr(set->page, idx)->dead ) {
+                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
+                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
+                       } else
+                               break;
+               }
+
+       if( set->page->act ) {
+               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               return bt->found = found, 0;
+       }
+
+       memcpy (pagefence, set->page->fence, 256);
+       set->page->kill = 1;
+
+       bt_unlockpage (BtLockWrite, set->latch);
 
-       //      remove ParentModify lock
+       if( bt_removepage (bt, set, 0, pagefence) )
+               return bt->err;
 
-       bt_unlockpage(BtLockParent, set);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
+       bt->found = found;
        return 0;
 }
 
@@ -1664,41 +2085,40 @@ BtKey ptr;
 
 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint  slot;
+uid id = 0;
 BtKey ptr;
-uid id;
 
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               ptr = keyptr(bt->page, slot);
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+               ptr = keyptr(set->page, slot);
        else
                return 0;
 
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( ptr->len == len && !memcmp (ptr->key, key, len) )
-               id = bt_getid(slotptr(bt->page,slot)->id);
-       else
-               id = 0;
+       if( slot <= set->page->cnt )
+         if( !keycmp (ptr, key, len) )
+               id = bt_getid(slotptr(set->page,slot)->id);
 
-       bt_unlockpage (BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage (BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return id;
 }
 
 //     check page for space available,
 //     clean if necessary and return
-//     =0 - page needs splitting
-//     >0 - go ahead at returned slot
+//     0 - page needs splitting
+//     >0  new slot value
 
-uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
 {
-uint nxt = bt->mgr->page_size;
-BtPage page = bt->page;
+uint nxt = bt->mgr->page_size, off;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
-uint newslot;
+uint newslot = max;
 BtKey key;
 
        if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
@@ -1717,29 +2137,39 @@ BtKey key;
        page->dirty = 0;
        page->act = 0;
 
-       // always leave fence key in list
+       // try cleaning up page first
+       // by removing deleted keys
 
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 1;
-               else if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
-               // copy key
-               key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               // if its not the fence key,
+               // copy the key across
+
+               off = slotptr(bt->frame,cnt)->off;
+
+               if( off >= sizeof(*page) ) {
+                       key = keyptr(bt->frame, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               }
 
                // copy slot
+
                memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
-               if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
-                       page->act++;
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
+               slotptr(page, idx)->off = off;
+               page->act++;
        }
+
        page->min = nxt;
        page->cnt = idx;
 
+       //      see if page has enough space now, or does it need splitting?
+
        if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
                return newslot;
 
@@ -1748,94 +2178,93 @@ BtKey key;
 
 // split the root and raise the height of the btree
 
-BTERR bt_splitroot(BtDb *bt,  unsigned char *newkey, unsigned char *oldkey, uid page_no2)
+BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
 {
 uint nxt = bt->mgr->page_size;
-BtPage root = bt->page;
+unsigned char leftkey[256];
 uid new_page;
 
        //  Obtain an empty page to use, and copy the current
-       //  root contents into it which is the lower half of
-       //      the old root.
+       //  root contents into it, e.g. lower keys
+
+       memcpy (leftkey, root->page->fence, 256);
+       root->page->posted = 1;
 
-       if( !(new_page = bt_newpage(bt, root)) )
+       if( !(new_page = bt_newpage(bt, root->page)) )
                return bt->err;
 
        // preserve the page info at the bottom
-       // and set rest to zero
+       // of higher keys and set rest to zero
 
-       memset(root+1, 0, bt->mgr->page_size - sizeof(*root));
+       memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
+       memset(root->page->fence, 0, 256);
+       root->page->fence[0] = 2;
+       root->page->fence[1] = 0xff;
+       root->page->fence[2] = 0xff;
 
-       // insert first key on newroot page
+       // insert lower keys page fence key on newroot page
 
-       nxt -= *newkey + 1;
-       memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1);
-       bt_putid(slotptr(root, 1)->id, new_page);
-       slotptr(root, 1)->off = nxt;
+       nxt -= *leftkey + 1;
+       memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
+       bt_putid(slotptr(root->page, 1)->id, new_page);
+       slotptr(root->page, 1)->off = nxt;
        
-       // insert second key on newroot page
+       // insert stopper key on newroot page
        // and increase the root height
 
-       nxt -= *oldkey + 1;
-       memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1);
-       bt_putid(slotptr(root, 2)->id, page_no2);
-       slotptr(root, 2)->off = nxt;
+       bt_putid(slotptr(root->page, 2)->id, page_no2);
+       slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
 
-       bt_putid(root->right, 0);
-       root->min = nxt;                // reset lowest used offset and key count
-       root->cnt = 2;
-       root->act = 2;
-       root->lvl++;
+       bt_putid(root->page->right, 0);
+       root->page->min = nxt;          // reset lowest used offset and key count
+       root->page->cnt = 2;
+       root->page->act = 2;
+       root->page->lvl++;
 
-       // release and unpin root (bt->page)
+       // release and unpin root
 
-       bt_unlockpage(BtLockWrite, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+       bt_unlockpage(BtLockWrite, root->latch);
+       bt_unpinlatch (root->latch);
+       bt_unpinpool (root->pool);
        return 0;
 }
 
 //  split already locked full node
 //     return unlocked.
 
-BTERR bt_splitpage (BtDb *bt)
+BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
-uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
-unsigned char oldkey[256], lowerkey[256];
-uid page_no = bt->page_no, right;
-BtLatchSet *nset, *set = bt->set;
-BtPool *pool = bt->pool;
-BtPage page = bt->page;
-uint lvl = page->lvl;
-uid new_page;
+uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size, off;
+unsigned char fencekey[256];
+uint lvl = set->page->lvl;
+uid right;
 BtKey key;
-uint tod;
 
        //  split higher half of keys to bt->frame
-       //      the last key (fence key) might be dead
-
-       tod = (uint)time(NULL);
 
        memset (bt->frame, 0, bt->mgr->page_size);
-       max = (int)page->cnt;
+       max = set->page->cnt;
        cnt = max / 2;
        idx = 0;
 
        while( cnt++ < max ) {
-               key = keyptr(page, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
-               memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
-               if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
-                       bt->frame->act++;
-               slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
-               slotptr(bt->frame, idx)->off = nxt;
+               if( !lvl || cnt < max ) {
+                       key = keyptr(set->page, cnt);
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
+               } else
+                       off = offsetof(struct BtPage_, fence);
+
+               memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
+               slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
+               slotptr(bt->frame, idx)->off = off;
+               bt->frame->act++;
        }
 
-       // remember existing fence key for new page to the right
-
-       memcpy (oldkey, key, key->len + 1);
+       if( set->page_no == ROOT_page )
+               bt->frame->posted = 1;
 
+       memcpy (bt->frame->fence, set->page->fence, 256);
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
        bt->frame->cnt = idx;
@@ -1843,170 +2272,191 @@ uint tod;
 
        // link right node
 
-       if( page_no > ROOT_page ) {
-               right = bt_getid (page->right);
-               bt_putid(bt->frame->right, right);
-       }
+       if( set->page_no > ROOT_page )
+               memcpy (bt->frame->right, set->page->right, BtId);
 
-       //      get new free page and write frame to it.
+       //      get new free page and write higher keys to it.
 
-       if( !(new_page = bt_newpage(bt, bt->frame)) )
+       if( !(right = bt_newpage(bt, bt->frame)) )
                return bt->err;
 
        //      update lower keys to continue in old page
 
-       memcpy (bt->frame, page, bt->mgr->page_size);
-       memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
+       memcpy (bt->frame, set->page, bt->mgr->page_size);
+       memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
        nxt = bt->mgr->page_size;
-       page->act = 0;
+       set->page->posted = 0;
+       set->page->dirty = 0;
+       set->page->act = 0;
        cnt = 0;
        idx = 0;
 
        //  assemble page of smaller keys
-       //      (they're all active keys)
 
        while( cnt++ < max / 2 ) {
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
-               slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
-               slotptr(page, idx)->off = nxt;
-               page->act++;
+
+               if( !lvl || cnt < max / 2 ) {
+                       off = nxt -= key->len + 1;
+                       memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
+               } else 
+                       off = offsetof(struct BtPage_, fence);
+
+               memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
+               slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
+               slotptr(set->page, idx)->off = off;
+               set->page->act++;
        }
 
-       // remember fence key for old page
+       // install fence key for smaller key page
 
-       memcpy(lowerkey, key, key->len + 1);
-       bt_putid(page->right, new_page);
-       page->min = nxt;
-       page->cnt = idx;
+       memset(set->page->fence, 0, 256);
+       memcpy(set->page->fence, key, key->len + 1);
+
+       bt_putid(set->page->right, right);
+       set->page->min = nxt;
+       set->page->cnt = idx;
 
        // if current page is the root page, split it
 
-       if( page_no == ROOT_page )
-               return bt_splitroot (bt, lowerkey, oldkey, new_page);
+       if( set->page_no == ROOT_page )
+               return bt_splitroot (bt, set, right);
 
-       // obtain Parent/Write locks
-       // for left and right node pages
+       bt_unlockpage (BtLockWrite, set->latch);
 
-       nset = bt_pinlatch (bt, new_page);
+       // insert new fences in their parent pages
 
-       bt_lockpage (BtLockParent, nset);
-       bt_lockpage (BtLockParent, set);
+       while( 1 ) {
+               bt_lockpage (BtLockParent, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
 
-       //  release wr lock on left page
-       //  (keep the SMO in sequence)
+               memcpy (fencekey, set->page->fence, 256);
+               right = bt_getid (set->page->right);
 
-       bt_unlockpage (BtLockWrite, set);
+               if( set->page->posted ) {
+                       bt_unlockpage (BtLockParent, set->latch);
+                       bt_unlockpage (BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
+               }
 
-       // insert new fence for reformulated left block
+               set->page->posted = 1;
+               bt_unlockpage (BtLockWrite, set->latch);
 
-       if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) )
-               return bt->err;
+               if( bt_insertkey (bt, fencekey+1, *fencekey, set->page_no, time(NULL), lvl+1) )
+                       return bt->err;
 
-       // fix old fence for newly allocated right block page
+               bt_unlockpage (BtLockParent, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
 
-       if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) )
-               return bt->err;
+               if( !(set->page_no = right) )
+                       break;
 
-       // release Parent locks
+               set->latch = bt_pinlatch (bt, right);
+
+               if( set->pool = bt_pinpool (bt, right) )
+                       set->page = bt_page (bt, set->pool, right);
+               else
+                       return bt->err;
+       }
 
-       bt_unlockpage (BtLockParent, nset);
-       bt_unlockpage (BtLockParent, set);
-       bt_unpinlatch (nset);
-       bt_unpinlatch (set);
-       bt_unpinpool (pool);
        return 0;
 }
 
-//  Insert new key into the btree at requested level.
-//  Level zero pages are leaf pages. Page is unlocked at exit.
+//  Insert new key into the btree at given level.
 
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
 {
+BtPageSet set[1];
 uint slot, idx;
-BtPage page;
 BtKey ptr;
 
-  while( 1 ) {
-       if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
-               ptr = keyptr(bt->page, slot);
-       else
-       {
-               if ( !bt->err )
-                       bt->err = BTERR_ovflw;
-               return bt->err;
-       }
-
-       // if key already exists, update id and return
-
-       page = bt->page;
+       while( 1 ) {
+               if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
+                       ptr = keyptr(set->page, slot);
+               else
+               {
+                       if ( !bt->err )
+                               bt->err = BTERR_ovflw;
+                       return bt->err;
+               }
 
-       if( bt->found = !keycmp (ptr, key, len) ) {
-               slotptr(page, slot)->dead = 0;
-               slotptr(page, slot)->tod = tod;
-               bt_putid(slotptr(page,slot)->id, id);
-               bt_unlockpage(BtLockWrite, bt->set);
-               bt_unpinlatch(bt->set);
-               bt_unpinpool (bt->pool);
-               return bt->err;
-       }
+               // if key already exists, update id and return
+
+               if( slot <= set->page->cnt )
+                 if( !keycmp (ptr, key, len) ) {
+                       if( slotptr(set->page, slot)->dead )
+                               set->page->act++;
+                       slotptr(set->page, slot)->dead = 0;
+                       slotptr(set->page, slot)->tod = tod;
+                       bt_putid(slotptr(set->page,slot)->id, id);
+                       bt_unlockpage(BtLockWrite, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
+                       return 0;
+               }
 
-       // check if page has enough space
+               // check if page has enough space
 
-       if( slot = bt_cleanpage (bt, len, slot) )
-               break;
+               if( slot = bt_cleanpage (bt, set->page, len, slot) )
+                       break;
 
-       if( bt_splitpage (bt) )
-               return bt->err;
-  }
+               if( bt_splitpage (bt, set) )
+                       return bt->err;
+       }
 
-  // calculate next available slot and copy key into page
+       // calculate next available slot and copy key into page
 
-  page->min -= len + 1; // reset lowest used offset
-  ((unsigned char *)page)[page->min] = len;
-  memcpy ((unsigned char *)page + page->min +1, key, len );
+       set->page->min -= len + 1; // reset lowest used offset
+       ((unsigned char *)set->page)[set->page->min] = len;
+       memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
 
-  for( idx = slot; idx < page->cnt; idx++ )
-       if( slotptr(page, idx)->dead )
+       for( idx = slot; idx <= set->page->cnt; idx++ )
+         if( slotptr(set->page, idx)->dead )
                break;
 
-  // now insert key into array before slot
-  // preserving the fence slot
+       // now insert key into array before slot
 
-  if( idx == page->cnt )
-       idx++, page->cnt++;
+       if( idx > set->page->cnt )
+               set->page->cnt++;
 
-  page->act++;
+       set->page->act++;
 
-  while( idx > slot )
-       *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
+       while( idx > slot )
+               *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
 
-  bt_putid(slotptr(page,slot)->id, id);
-  slotptr(page, slot)->off = page->min;
-  slotptr(page, slot)->tod = tod;
-  slotptr(page, slot)->dead = 0;
+       bt_putid(slotptr(set->page,slot)->id, id);
+       slotptr(set->page, slot)->off = set->page->min;
+       slotptr(set->page, slot)->tod = tod;
+       slotptr(set->page, slot)->dead = 0;
 
-  bt_unlockpage (BtLockWrite, bt->set);
-  bt_unpinlatch (bt->set);
-  bt_unpinpool (bt->pool);
-  return 0;
+       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
 }
 
 //  cache page of keys into cursor and return starting slot for given key
 
 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
 {
+BtPageSet set[1];
 uint slot;
 
        // cache page for retrieval
-       if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
-               memcpy (bt->cursor, bt->page, bt->mgr->page_size);
-       bt->cursor_page = bt->page_no;
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (bt->pool);
+
+       if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
+         memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       else
+         return 0;
+
+       bt->cursor_page = set->page_no;
+
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        return slot;
 }
 
@@ -2015,8 +2465,7 @@ uint slot;
 
 uint bt_nextkey (BtDb *bt, uint slot)
 {
-BtPool *pool;
-BtPage page;
+BtPageSet set[1];
 uid right;
 
   do {
@@ -2024,7 +2473,7 @@ uid right;
        while( slot++ < bt->cursor->cnt )
          if( slotptr(bt->cursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt))
+         else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
                return slot;
          else
                break;
@@ -2034,19 +2483,19 @@ uid right;
 
        bt->cursor_page = right;
 
-       if( pool = bt_pinpool (bt, right) )
-               page = bt_page (bt, pool, right);
+       if( set->pool = bt_pinpool (bt, right) )
+               set->page = bt_page (bt, set->pool, right);
        else
                return 0;
 
-       bt->set = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, bt->set);
+       set->latch = bt_pinlatch (bt, right);
+    bt_lockpage(BtLockRead, set->latch);
 
-       memcpy (bt->cursor, page, bt->mgr->page_size);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, bt->set);
-       bt_unpinlatch (bt->set);
-       bt_unpinpool (pool);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
        slot = 0;
   } while( 1 );
 
@@ -2068,50 +2517,62 @@ uint bt_tod(BtDb *bt, uint slot)
        return slotptr(bt->cursor,slot)->tod;
 }
 
+
 #ifdef STANDALONE
 
 void bt_latchaudit (BtDb *bt)
 {
 ushort idx, hashidx;
-BtLatchSet *set;
+BtLatchSet *latch;
 BtPool *pool;
 BtPage page;
 uid page_no;
 
 #ifdef unix
        for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
-               set = bt->mgr->latchsets + idx;
-               if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
-                       fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
-                       *(ushort *)set->readwr = 0;
-                       *(ushort *)set->access = 0;
-                       *(ushort *)set->parent = 0;
+               latch = bt->mgr->latchsets + idx;
+               if( *(uint *)latch->readwr ) {
+                       fprintf(stderr, "latchset %d r/w locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->readwr = 0;
+               }
+               if( *(uint *)latch->access ) {
+                       fprintf(stderr, "latchset %d access locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->access = 0;
+               }
+               if( *(uint *)latch->parent ) {
+                       fprintf(stderr, "latchset %d parent locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->parent = 0;
                }
-               if( set->pin ) {
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-                       set->pin = 0;
+               if( *(uint *)latch->busy ) {
+                       fprintf(stderr, "latchset %d busy locked for page %.8x\n", idx, latch->page_no);
+                       *(uint *)latch->parent = 0;
+               }
+               if( latch->pin ) {
+                       fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
+                       latch->pin = 0;
                }
        }
 
        for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
-         if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
-               fprintf(stderr, "latchmgr locked\n");
          if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
-               set = bt->mgr->latchsets + idx;
-               if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
-                       fprintf(stderr, "latchset %d locked\n", idx);
-               if( set->hash != hashidx )
+               latch = bt->mgr->latchsets + idx;
+               if( latch->hash != hashidx ) {
                        fprintf(stderr, "latchset %d wrong hashidx\n", idx);
-               if( set->pin )
-                       fprintf(stderr, "latchset %d pinned\n", idx);
-         } while( idx = set->next );
+                       latch->hash = hashidx;
+               }
+         } while( idx = latch->next );
        }
+
        page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
 
        while( page_no ) {
                fprintf(stderr, "free: %.6x\n", (uint)page_no);
-               pool = bt_pinpool (bt, page_no);
-               page = bt_page (bt, pool, page_no);
+
+               if( pool = bt_pinpool (bt, page_no) )
+                       page = bt_page (bt, pool, page_no);
+               else
+                       return;
+
            page_no = bt_getid(page->right);
                bt_unpinpool (pool);
        }
@@ -2139,9 +2600,8 @@ uid next, page_no = LEAF_page;    // start on first page of leaves
 unsigned char key[256];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
+BtPageSet set[1];
 time_t tod[1];
-BtPool *pool;
-BtPage page;
 BtKey ptr;
 BtDb *bt;
 FILE *in;
@@ -2171,7 +2631,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_insertkey (bt, key, len, 0, line, *tod) )
+                         if( bt_insertkey (bt, key, len, line, *tod, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2193,7 +2653,7 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_deletekey (bt, key, len, 0) )
+                         if( bt_deletekey (bt, key, len) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -2227,40 +2687,48 @@ FILE *in;
                break;
 
        case 's':
-               len = key[0] = 0;
-
-               fprintf(stderr, "started reading\n");
-
-               if( slot = bt_startkey (bt, key, len) )
-                 slot--;
-               else
-                 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
-
-               while( slot = bt_nextkey (bt, slot) ) {
-                       ptr = bt_key(bt, slot);
-                       fwrite (ptr->key, ptr->len, 1, stdout);
-                       fputc ('\n', stdout);
-               }
-
-               break;
-
-       case 'c':
-               fprintf(stderr, "started reading\n");
-
+               fprintf(stderr, "started scanning\n");
                do {
-                       if( bt->pool = bt_pinpool (bt, page_no) )
-                               page = bt_page (bt, bt->pool, page_no);
+                       if( set->pool = bt_pinpool (bt, page_no) )
+                               set->page = bt_page (bt, set->pool, page_no);
                        else
                                break;
-                       bt->set = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, bt->set);
-                       cnt += page->act;
-                       next = bt_getid (page->right);
-                       bt_unlockpage (BtLockRead, bt->set);
-                       bt_unpinlatch (bt->set);
-                       bt_unpinpool (bt->pool);
+                       set->latch = bt_pinlatch (bt, page_no);
+                       bt_lockpage (BtLockRead, set->latch);
+                       next = bt_getid (set->page->right);
+                       cnt += set->page->act;
+
+                       for( slot = 0; slot++ < set->page->cnt; )
+                        if( next || slot < set->page->cnt )
+                         if( !slotptr(set->page, slot)->dead ) {
+                               ptr = keyptr(set->page, slot);
+                               fwrite (ptr->key, ptr->len, 1, stdout);
+                               fputc ('\n', stdout);
+                         }
+
+                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unpinlatch (set->latch);
+                       bt_unpinpool (set->pool);
                } while( page_no = next );
 
+               cnt--;  // remove stopper key
+               fprintf(stderr, " Total keys read %d\n", cnt);
+               break;
+
+       case 'c':
+               fprintf(stderr, "started counting\n");
+               next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
+               page_no = LEAF_page;
+
+               while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
+                       pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits);
+                       if( !bt->frame->free && !bt->frame->lvl )
+                               cnt += bt->frame->act;
+                       if( page_no > LEAF_page )
+                               next = page_no + 1;
+                       page_no = next;
+               }
+               
                cnt--;  // remove stopper key
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;