]> pd.if.org Git - btree/commitdiff
incorporate reworked bt_deletekey into file I/O version
authorunknown <karl@E04.petzent.com>
Thu, 13 Feb 2014 20:01:55 +0000 (12:01 -0800)
committerunknown <karl@E04.petzent.com>
Thu, 13 Feb 2014 20:01:55 +0000 (12:01 -0800)
btree2t.c

index 00c6f24ccc0c6ccad212e1be350dcc6f91858bfe..67d901ec61b90ac6a85d8270ecf35b1cbe6825fa 100644 (file)
--- a/btree2t.c
+++ b/btree2t.c
@@ -1,5 +1,5 @@
 // btree version 2t
 // btree version 2t
-// 04 FEB 2014
+// 15 FEB 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -119,10 +119,13 @@ typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
-       unsigned char bits;                     // page size in bits
-       unsigned char lvl:6;            // level of page
+       unsigned char bits:7;           // page size in bits
+       unsigned char free:1;           // page is on free list
+       unsigned char lvl:4;            // level of page
+       unsigned char kill:1;           // page is empty
        unsigned char dirty:1;          // page is dirty
        unsigned char posted:1;         // page fence is posted
        unsigned char dirty:1;          // page is dirty
        unsigned char posted:1;         // page fence is posted
+       unsigned char goright:1;        // continue to right link
        unsigned char right[BtId];      // page number to right
        unsigned char fence[256];       // page fence key
 } *BtPage;
        unsigned char right[BtId];      // page number to right
        unsigned char fence[256];       // page fence key
 } *BtPage;
@@ -160,6 +163,7 @@ typedef struct _BtDb {
        uint mode;                      // read-write mode
        uint mapped_io;         // use memory mapping
        BtPage temp;            // temporary frame buffer (memory mapped/file IO)
        uint mode;                      // read-write mode
        uint mapped_io;         // use memory mapping
        BtPage temp;            // temporary frame buffer (memory mapped/file IO)
+       BtPage temp2;           // temporary frame buffer (memory mapped/file IO)
        BtPage parent;          // current page's parent node (memory mapped/file IO)
        BtPage alloc;           // frame for alloc page  (memory mapped/file IO)
        BtPage cursor;          // cached frame for start/next (never mapped)
        BtPage parent;          // current page's parent node (memory mapped/file IO)
        BtPage alloc;           // frame for alloc page  (memory mapped/file IO)
        BtPage cursor;          // cached frame for start/next (never mapped)
@@ -216,6 +220,7 @@ extern uint bt_tod (BtDb *bt, uint slot);
 //  BTree page number constants
 #define ALLOC_page             0
 #define ROOT_page              1
 //  BTree page number constants
 #define ALLOC_page             0
 #define ROOT_page              1
+#define LEAF_page              2
 
 //     Number of levels to create in a new BTree
 
 
 //     Number of levels to create in a new BTree
 
@@ -565,10 +570,10 @@ SYSTEM_INFO sysinfo[1];
                bt->seg_bits++;
 
 #ifdef unix
                bt->seg_bits++;
 
 #ifdef unix
-       bt->mem = malloc (7 *bt->page_size);
+       bt->mem = malloc (8 *bt->page_size);
        bt->cache = calloc (bt->hashsize, sizeof(ushort));
 #else
        bt->cache = calloc (bt->hashsize, sizeof(ushort));
 #else
-       bt->mem = VirtualAlloc(NULL, 7 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
+       bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
        bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
 #endif
        bt->frame = (BtPage)bt->mem;
        bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort));
 #endif
        bt->frame = (BtPage)bt->mem;
@@ -576,8 +581,9 @@ SYSTEM_INFO sysinfo[1];
        bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
        bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
        bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
        bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
        bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
        bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
-       bt->parent = (BtPage)(bt->mem + 5 * bt->page_size);
-       bt->zero = (BtPage)(bt->mem + 6 * bt->page_size);
+       bt->temp2 = (BtPage)(bt->mem + 5 * bt->page_size);
+       bt->parent = (BtPage)(bt->mem + 6 * bt->page_size);
+       bt->zero = (BtPage)(bt->mem + 7 * bt->page_size);
 
        if( size || *amt ) {
                if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
 
        if( size || *amt ) {
                if ( bt_unlockpage(bt, ALLOC_page, lockmode) )
@@ -981,18 +987,12 @@ int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
 {
 uint diff, higher = set->page->cnt, low = 1, slot;
 
 {
 uint diff, higher = set->page->cnt, low = 1, slot;
 
-       //      is page being deleted?  if so,
-       //      tell caller to follow right link
-
-       if( !set->page->act )
-               return 0;
-
        //      make stopper key an infinite fence value
 
        if( bt_getid (set->page->right) )
                higher++;
 
        //      make stopper key an infinite fence value
 
        if( bt_getid (set->page->right) )
                higher++;
 
-       //      low is the next candidate, higher is already
+       //      low is the lowest candidate, higher is already
        //      tested as .ge. the given key, loop ends when they meet
 
        while( diff = higher - low ) {
        //      tested as .ge. the given key, loop ends when they meet
 
        while( diff = higher - low ) {
@@ -1025,7 +1025,6 @@ int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lv
 uid page_no = ROOT_page, prevpage = 0;
 uint drill = 0xff, slot;
 uint mode, prevmode;
 uid page_no = ROOT_page, prevpage = 0;
 uint drill = 0xff, slot;
 uint mode, prevmode;
-int posted = 1;
 
   //  start at root of btree and drill down
 
 
   //  start at root of btree and drill down
 
@@ -1077,12 +1076,35 @@ int posted = 1;
        prevpage = page_no;
        prevmode = mode;
 
        prevpage = page_no;
        prevmode = mode;
 
+       //      if page is being deleted and we should continue right
+
+       if( set->page->kill && set->page->goright ) {
+               page_no = bt_getid (set->page->right);
+               continue;
+       }
+
+       //      otherwise, wait for deleted node to clear
+
+       if( set->page->kill ) {
+               if( bt_unlockpage(bt, set->page_no, mode) )
+                       return bt->err;
+               page_no = ROOT_page;
+               prevpage = 0;
+               drill = 0xff;
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+
        //  find key on page at this level
        //  and descend to requested level
 
        if( slot = bt_findslot (set, key, len) ) {
          if( drill == lvl )
        //  find key on page at this level
        //  and descend to requested level
 
        if( slot = bt_findslot (set, key, len) ) {
          if( drill == lvl )
-               return bt->posted = posted, slot;
+               return slot;
 
          if( slot > set->page->cnt )
                return bt->err = BTERR_struct, 0;
 
          if( slot > set->page->cnt )
                return bt->err = BTERR_struct, 0;
@@ -1096,7 +1118,6 @@ int posted = 1;
                        return bt->err = BTERR_struct, 0;
 
          page_no = bt_getid(slotptr(set->page, slot)->id);
                        return bt->err = BTERR_struct, 0;
 
          page_no = bt_getid(slotptr(set->page, slot)->id);
-         posted = 1;
          drill--;
          continue;
        }
          drill--;
          continue;
        }
@@ -1105,7 +1126,6 @@ int posted = 1;
        //  (slide left from deleted page)
 
        page_no = bt_getid(set->page->right);
        //  (slide left from deleted page)
 
        page_no = bt_getid(set->page->right);
-       posted = 0;
 
   } while( page_no );
 
 
   } while( page_no );
 
@@ -1116,92 +1136,78 @@ int posted = 1;
 }
 
 // drill down fixing fence values for left sibling tree
 }
 
 // drill down fixing fence values for left sibling tree
-//     starting with current left page in bt->temp
-//  call with bt->temp write locked
-//     return with all pages unlocked.
 
 
-BTERR bt_fixfences (BtDb *bt, uid page_no, unsigned char *newfence)
+//  call with set write locked mapped to bt->temp
+//     return with set unlocked & unpinned.
+
+BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
 {
 unsigned char oldfence[256];
 {
 unsigned char oldfence[256];
-uid prevpage = 0;
+BtPageSet next[1];
+uid right;
 int chk;
 
 int chk;
 
-  memcpy (oldfence, bt->temp->fence, 256);
-
-  //  start at left page of btree and drill down
-  //  to update their fence values
-
-  do {
-       // obtain access lock using lock chaining
+  next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
+  memcpy (oldfence, set->page->fence, 256);
+  next->page = bt->temp2;
+  bt->temp2 = bt->temp;
+  bt->temp = next->page;
 
 
-       if( prevpage ) {
-         if( bt_unlockpage(bt, prevpage, BtLockWrite) )
+  while( !set->page->kill && set->page->lvl ) {
+       if( bt_lockpage (bt, next->page_no, BtLockParent) )
                return bt->err;
                return bt->err;
-         if( bt_unlockpage(bt, prevpage, BtLockParent) )
+       if( bt_lockpage (bt, next->page_no, BtLockAccess) )
                return bt->err;
                return bt->err;
-
-         // obtain parent/fence key maintenance lock
-
-         if( bt_lockpage(bt, page_no, BtLockParent) )
-               return bt->err;                                                                 
-
-         if( bt_lockpage(bt, page_no, BtLockAccess) )
-               return bt->err;                                                                 
-
-         if( bt_unlockpage(bt, prevpage, BtLockWrite) )
+       if( bt_lockpage (bt, next->page_no, BtLockWrite) )
+               return bt->err;
+       if( bt_unlockpage (bt, next->page_no, BtLockAccess) )
                return bt->err;
 
                return bt->err;
 
-         // obtain write lock using lock chaining
-
-         if( bt_lockpage(bt, page_no, BtLockWrite) )
-               return bt->err;                                                                 
-
-         if( bt_unlockpage(bt, page_no, BtLockAccess) )
-               return bt->err;                                                                 
-
-         //  map/obtain page contents
-
-         if( bt_mappage (bt, &bt->temp, page_no) )
+       if( bt_mappage (bt, &next->page, next->page_no) )
                return bt->err;
                return bt->err;
-       }
 
 
-       chk = keycmp ((BtKey)bt->temp->fence, oldfence + 1, *oldfence);
-    prevpage = page_no;
+       chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
 
        if( chk < 0 ) {
 
        if( chk < 0 ) {
-               page_no = bt_getid (bt->temp->right);
-               continue;
+         right = bt_getid (next->page->right);
+         if( bt_unlockpage (bt, next->page_no, BtLockWrite) )
+               return bt->err;
+         if( bt_unlockpage (bt, next->page_no, BtLockParent) )
+               return bt->err;
+         next->page_no = right;
+         continue;
        }
 
        if( chk > 0 )
        }
 
        if( chk > 0 )
-               return bt->err = BTERR_struct;
-
-       memcpy (bt->temp->fence, newfence, 256);
-
-       if( bt_update (bt, bt->temp, page_no) )
-               return bt->err;
+         return bt->err = BTERR_struct;
 
 
-       //  return when we reach a leaf page
+       if( bt_fixfences (bt, next, newfence) )
+         return bt->err;
 
 
-       if( !bt->temp->lvl ) {
-               if( bt_unlockpage (bt, page_no, BtLockWrite) )
-                       return bt->err;
-               return bt_unlockpage (bt, page_no, BtLockParent);
-       }
+       break;
+  }
 
 
-       page_no = bt_getid(slotptr(bt->temp, bt->temp->cnt)->id);
+  set->page = bt->temp;
 
 
-  } while( page_no );
+  if( bt_mappage (bt, &set->page, set->page_no) )
+       return bt->err;
 
 
-  // return error on end of right chain
+  memcpy (set->page->fence, newfence, 256);
 
 
-  return bt->err = BTERR_struct;
+  if( bt_update(bt, set->page, set->page_no) )
+       return bt->err;
+  if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
+       return bt->err;
+  if( bt_unlockpage (bt, set->page_no, BtLockParent) )
+       return bt->err;
+  return 0;
 }
 
 }
 
+
 //     return page to free list
 //     page must be delete & write locked
 
 //     return page to free list
 //     page must be delete & write locked
 
-BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
+BTERR bt_freepage (BtDb *bt, BtPageSet *set)
 {
        //      lock & map allocation page
 
 {
        //      lock & map allocation page
 
@@ -1212,12 +1218,13 @@ BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
                return bt->err;
 
        //      store chain in second right
                return bt->err;
 
        //      store chain in second right
-       bt_putid(page->right, bt_getid(bt->alloc[1].right));
-       bt_putid(bt->alloc[1].right, page_no);
+       bt_putid(set->page->right, bt_getid(bt->alloc[1].right));
+       bt_putid(bt->alloc[1].right, set->page_no);
+       set->page->free = 1;
 
        if( bt_update(bt, bt->alloc, ALLOC_page) )
                return bt->err;
 
        if( bt_update(bt, bt->alloc, ALLOC_page) )
                return bt->err;
-       if( bt_update(bt, page, page_no) )
+       if( bt_update(bt, set->page, set->page_no) )
                return bt->err;
 
        // unlock page zero 
                return bt->err;
 
        // unlock page zero 
@@ -1227,15 +1234,15 @@ BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no)
 
        //  remove write lock on deleted node
 
 
        //  remove write lock on deleted node
 
-       if( bt_unlockpage(bt, page_no, BtLockWrite) )
+       if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
                return bt->err;
 
                return bt->err;
 
-       return bt_unlockpage (bt, page_no, BtLockDelete);
+       return bt_unlockpage (bt, set->page_no, BtLockDelete);
 }
 
 //     remove the root level by promoting its only child
 
 }
 
 //     remove the root level by promoting its only child
 
-BTERR bt_removeroot (BtDb *bt, BtPage root, BtPage child, uid page_no)
+BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
 {
 uid next = 0;
 
 {
 uid next = 0;
 
@@ -1246,20 +1253,20 @@ uid next = 0;
          if( bt_lockpage (bt, next, BtLockWrite) )
                return bt->err;
 
          if( bt_lockpage (bt, next, BtLockWrite) )
                return bt->err;
 
-         if( bt_mappage (bt, &child, next) )
+         if( bt_mappage (bt, &child->page, next) )
                return bt->err;
 
                return bt->err;
 
-         page_no = next;
+         child->page_no = next;
        }
 
        }
 
-       memcpy (root, child, bt->page_size);
-       next = bt_getid (slotptr(child, child->cnt)->id);
+       memcpy (root->page, child->page, bt->page_size);
+       next = bt_getid (slotptr(child->page, child->page->cnt)->id);
 
 
-       if( bt_freepage (bt, child, page_no) )
+       if( bt_freepage (bt, child) )
                return bt->err;
                return bt->err;
-  } while( root->lvl > 1 && root->cnt == 1 );
+  } while( root->page->lvl > 1 && root->page->cnt == 1 );
 
 
-  if( bt_update (bt, root, ROOT_page) )
+  if( bt_update (bt, root->page, ROOT_page) )
        return bt->err;
 
   return bt_unlockpage (bt, ROOT_page, BtLockWrite);
        return bt->err;
 
   return bt_unlockpage (bt, ROOT_page, BtLockWrite);
@@ -1267,42 +1274,12 @@ uid next = 0;
 
 //  pull right page over ourselves in simple merge
 
 
 //  pull right page over ourselves in simple merge
 
-BTERR bt_mergeright (BtDb *bt, uid page_no, BtPageSet *parent, uint slot)
+BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
 {
 {
-uid right;
-uint idx;
-
-       // find our right neighbor
-       //      right must exist because the stopper prevents
-       //      the rightmost page from deleting
-
-       for( idx = slot; idx++ < parent->page->cnt; )
-         if( !slotptr(parent->page, idx)->dead )
-               break;
-
-       right = bt_getid (slotptr (parent->page, idx)->id);
-
-       if( right != bt_getid (bt->page->right) )
-               return bt->err = BTERR_struct;
-
-       if( bt_lockpage (bt, right, BtLockDelete) )
-               return bt->err;
-
-       if( bt_lockpage (bt, right, BtLockWrite) )
-               return bt->err;
-
-       if( bt_mappage (bt, &bt->temp, right) )
-               return bt->err;
-
-       memcpy (bt->page, bt->temp, bt->page_size);
-
-       if( bt_update(bt, bt->page, page_no) )
-               return bt->err;
-
        //  install ourselves as child page
        //      and delete ourselves from parent
 
        //  install ourselves as child page
        //      and delete ourselves from parent
 
-       bt_putid (slotptr(parent->page, idx)->id, page_no);
+       bt_putid (slotptr(parent->page, idx)->id, set->page_no);
        slotptr(parent->page, slot)->dead = 1;
        parent->page->act--;
 
        slotptr(parent->page, slot)->dead = 1;
        parent->page->act--;
 
@@ -1315,15 +1292,20 @@ uint idx;
          } else
                  break;
 
          } else
                  break;
 
-       if( bt_freepage (bt, bt->temp, right) )
+       memcpy (set->page, right->page, bt->page_size);
+
+       if( bt_unlockpage (bt, right->page_no, BtLockParent) )
+               return bt->err;
+
+       if( bt_freepage (bt, right) )
                return bt->err;
 
        //      do we need to remove a btree level?
        //      (leave the first page of leaves alone)
 
        if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
                return bt->err;
 
        //      do we need to remove a btree level?
        //      (leave the first page of leaves alone)
 
        if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
-         if( bt->page->lvl )
-               return bt_removeroot (bt, parent->page, bt->page, page_no);
+         if( set->page->lvl )
+               return bt_removeroot (bt, parent, set);
 
        if( bt_update (bt, parent->page, parent->page_no) )
                return bt->err;
 
        if( bt_update (bt, parent->page, parent->page_no) )
                return bt->err;
@@ -1331,10 +1313,13 @@ uint idx;
        if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
                return bt->err;
 
        if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
                return bt->err;
 
-       if( bt_unlockpage (bt, page_no, BtLockWrite) )
+       if( bt_update (bt, set->page, set->page_no) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_unlockpage (bt, page_no, BtLockDelete) )
+       if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
+               return bt->err;
+
+       if( bt_unlockpage (bt, set->page_no, BtLockDelete) )
                return bt->err;
 
        return 0;
                return bt->err;
 
        return 0;
@@ -1343,94 +1328,67 @@ uint idx;
 //     remove both child and parent from the btree
 //     from the fence position in the parent
 
 //     remove both child and parent from the btree
 //     from the fence position in the parent
 
-BTERR bt_removeparent (BtDb *bt, uid page_no, BtPageSet *parent, uint lvl)
+BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet  *right, BtPageSet *rparent, uint lvl)
 {
 {
-unsigned char rightfence[256], pagefence[256];
-uid right, ppage_no;
-
-       right = bt_getid (bt->page->right);
-
-       if( bt_lockpage (bt, right, BtLockParent) )
-               return bt->err;
-
-       if( bt_lockpage (bt, right, BtLockAccess) )
-               return bt->err;
-
-       if( bt_lockpage (bt, right, BtLockWrite) )
-               return bt->err;
-
-       if( bt_unlockpage (bt, right, BtLockAccess) )
-               return bt->err;
-
-       if( bt_mappage (bt, &bt->temp, right) )
-               return bt->err;
-
-       //      save right page fence value and
-       //      parent fence value
-
-       memcpy (rightfence, bt->temp->fence, 256);
-       memcpy (pagefence, parent->page->fence, 256);
-       ppage_no = parent->page_no;
+unsigned char pagefence[256];
+uint idx;
 
        //  pull right sibling over ourselves and unlock
 
 
        //  pull right sibling over ourselves and unlock
 
-       memcpy (bt->page, bt->temp, bt->page_size);
-
-       if( bt_update(bt, bt->page, page_no) )
-               return bt->err;
+       memcpy (child->page, right->page, bt->page_size);
 
 
-       if( bt_unlockpage (bt, page_no, BtLockDelete) )
+       if( bt_update(bt, child->page, child->page_no) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_unlockpage (bt, page_no, BtLockWrite) )
+       if( bt_unlockpage (bt, child->page_no, BtLockWrite) )
                return bt->err;
 
                return bt->err;
 
-       //  install ourselves into right link from old right page
+       //  install ourselves into right link of old right page
 
 
-       bt->temp->act = 0;              // tell bt_findslot to go right (left)
-       bt_putid (bt->temp->right, page_no);
+       bt_putid (right->page->right, child->page_no);
+       right->page->goright = 1;               // tell bt_loadpage to go right to us
+       right->page->kill = 1;
 
 
-       if( bt_update (bt, bt->temp, right) )
+       if( bt_update(bt, right->page, right->page_no) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_unlockpage (bt, right, BtLockWrite) )
+       if( bt_unlockpage (bt, right->page_no, BtLockWrite) )
                return bt->err;
 
        //      remove our slot from our parent
                return bt->err;
 
        //      remove our slot from our parent
-       //      clear act to signal bt_findslot to move right
+       //      signal to move right
 
 
-       slotptr(parent->page, parent->page->cnt)->dead = 1;
-       parent->page->act = 0;
+       parent->page->goright = 1;              // tell bt_findslot to go right to rparent
+       parent->page->kill = 1;
+       parent->page->act--;
 
 
-       if( bt_update (bt, parent->page, ppage_no) )
-               return bt->err;
-       if( bt_unlockpage (bt, ppage_no, BtLockWrite) )
-               return bt->err;
+       //      redirect right page pointer in right parent to us
 
 
-       //      redirect right page pointer in its parent to our left
-       //      and free the right page
+       for( idx = 0; idx++ < rparent->page->cnt; )
+         if( !slotptr(rparent->page, idx)->dead )
+               break;
 
 
-       if( bt_insertkey (bt, rightfence+1, *rightfence, lvl, page_no, time(NULL)) )
-               return bt->err;
+       if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
+               return bt->err = BTERR_struct;
+
+       bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
 
 
-       if( bt_removepage (bt, ppage_no, lvl, pagefence) )
+       if( bt_update (bt, rparent->page, rparent->page_no) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_unlockpage (bt, right, BtLockParent) )
+       if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) )
                return bt->err;
 
                return bt->err;
 
-       //      wait for others to drain away
+       //      save parent page fence value
 
 
-       if( bt_lockpage (bt, right, BtLockDelete) )
-               return bt->err;
+       memcpy (pagefence, parent->page->fence, 256);
 
 
-       if( bt_lockpage (bt, right, BtLockWrite) )
+       if( bt_update (bt, parent->page, parent->page_no) )
                return bt->err;
                return bt->err;
-
-       if( bt_mappage (bt, &bt->temp, right) )
+       if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
                return bt->err;
 
                return bt->err;
 
-       return bt_freepage (bt, bt->temp, right);
+       return bt_removepage (bt, parent->page_no, lvl, pagefence);
 }
 
 //     remove page from btree
 }
 
 //     remove page from btree
@@ -1439,22 +1397,24 @@ uid right, ppage_no;
 
 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence)
 {
 
 BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence)
 {
-BtPageSet parent[1];
+BtPageSet parent[1], rparent[1], sibling[1], set[1];
+unsigned char newfence[256];
 uint slot, idx;
 BtKey ptr;
 uint slot, idx;
 BtKey ptr;
-uid left;
 
 
-       parent->page = bt->parent;
+  parent->page = bt->parent;
+  set->page_no = page_no;
+  set->page = bt->page;
 
        //      load and lock our parent
 
 
        //      load and lock our parent
 
-retry:
+  while( 1 ) {
        if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
        //      wait until we are posted in our parent
 
        if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
        //      wait until we are posted in our parent
 
-       if( !bt->posted ) {
+       if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
                if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
                        return bt->err;
 #ifdef unix
                if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
                        return bt->err;
 #ifdef unix
@@ -1462,37 +1422,74 @@ retry:
 #else
                SwitchToThread();
 #endif
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        }
 
-       //      wait for others to finish
-       //      and obtain final WriteLock
+       //      can we do a simple merge entirely
+       //      between siblings on the parent page?
 
 
-       if( bt_lockpage (bt, page_no, BtLockDelete) )
-               return bt->err;
+       if( slot < parent->page->cnt ) {
+               // find our right neighbor
+               //      right must exist because the stopper prevents
+               //      the rightmost page from deleting
 
 
-       if( bt_lockpage (bt, page_no, BtLockWrite) )
-               return bt->err;
+               for( idx = slot; idx++ < parent->page->cnt; )
+                 if( !slotptr(parent->page, idx)->dead )
+                       break;
 
 
-       if( bt_mappage (bt, &bt->page, page_no) )
-               return bt->err;
+               sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
 
 
-       //  was page re-established?
+               if( bt_lockpage (bt, set->page_no, BtLockDelete) )
+                       return bt->err;
 
 
-       if( bt->page->act ) {
-               if( bt_unlockpage (bt, page_no, BtLockWrite) )
+               if( bt_lockpage (bt, set->page_no, BtLockWrite) )
                        return bt->err;
                        return bt->err;
-               if( bt_unlockpage (bt, page_no, BtLockDelete) )
+
+               if( bt_mappage (bt, &set->page, set->page_no) )
                        return bt->err;
 
                        return bt->err;
 
-               return bt_unlockpage (bt, parent->page_no, BtLockWrite);
-       }
-               
-       //      can we do a simple merge entirely
-       //      between siblings on the parent page?
+               //      merge right if sibling shows up in
+               //  our parent and is not being killed
 
 
-       if( slot < parent->page->cnt )
-               return bt_mergeright(bt, page_no, parent, slot);
+               if( sibling->page_no == bt_getid (set->page->right) ) {
+                 if( bt_lockpage (bt, sibling->page_no, BtLockParent) )
+                       return bt->err;
+
+                 if( bt_lockpage (bt, sibling->page_no, BtLockDelete) )
+                       return bt->err;
+
+                 if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
+                       return bt->err;
+
+                 sibling->page = bt->temp;
+
+                 if( bt_mappage (bt, &sibling->page, sibling->page_no) )
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_mergeright(bt, set, parent, sibling, slot, idx);
+
+                 //  try again later
+
+                 if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
+                       return bt->err;
+               }
+
+               if( bt_unlockpage (bt, set->page_no, BtLockDelete) )
+                       return bt->err;
+
+               if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
+                       return bt->err;
+
+               if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
+                       return bt->err;
+#ifdef linux
+               sched_yield ();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
 
        //  find our left neighbor in our parent page
 
 
        //  find our left neighbor in our parent page
 
@@ -1502,51 +1499,127 @@ retry:
 
        //      if no left neighbor, delete ourselves and our parent
 
 
        //      if no left neighbor, delete ourselves and our parent
 
-       if( !idx )
-               return bt_removeparent (bt, page_no, parent, lvl+1);
+       if( !idx ) {
+               if( bt_lockpage (bt, set->page_no, BtLockAccess) )
+                       return bt->err;
+
+               if( bt_lockpage (bt, set->page_no, BtLockWrite) )
+                       return bt->err;
+
+               if( bt_unlockpage (bt, set->page_no, BtLockAccess) )
+                       return bt->err;
+
+               if( bt_mappage (bt, &set->page, set->page_no) )
+                       return bt->err;
+
+               rparent->page_no = bt_getid (parent->page->right);
+               rparent->page = bt->temp;
+
+               if( bt_lockpage (bt, rparent->page_no, BtLockAccess) )
+                       return bt->err;
+
+               if( bt_lockpage (bt, rparent->page_no, BtLockWrite) )
+                       return bt->err;
+
+               if( bt_unlockpage (bt, rparent->page_no, BtLockAccess) )
+                       return bt->err;
+
+               if( bt_mappage (bt, &rparent->page, rparent->page_no) )
+                       return bt->err;
+
+               if( !rparent->page->kill ) {
+                 sibling->page_no = bt_getid (set->page->right);
+
+                 if( bt_lockpage (bt, sibling->page_no, BtLockAccess) )
+                       return bt->err;
+
+                 if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
+                       return bt->err;
+
+                 if( bt_unlockpage (bt, sibling->page_no, BtLockAccess) )
+                       return bt->err;
+
+                 sibling->page = bt->temp2;
+
+                 if( bt_mappage (bt, &sibling->page, sibling->page_no) )
+                       return bt->err;
+
+                 if( !sibling->page->kill )
+                       return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
+
+                 //  try again later
+
+                 if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
+                       return bt->err;
+               }
+
+               if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
+                       return bt->err;
+
+               if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) )
+                       return bt->err;
+
+               if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
+                       return bt->err;
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
 
 
-       // lock and map our left neighbor's page
+       // redirect parent to our left sibling
+       // lock and map our left sibling's page
 
 
-       left = bt_getid (slotptr(parent->page, idx)->id);
+       sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
+       sibling->page = bt->temp;
 
        //      wait our turn on fence key maintenance
 
 
        //      wait our turn on fence key maintenance
 
-       if( bt_lockpage(bt, left, BtLockParent) )
+       if( bt_lockpage(bt, sibling->page_no, BtLockParent) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_lockpage(bt, left, BtLockAccess) )
+       if( bt_lockpage(bt, sibling->page_no, BtLockAccess) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_lockpage(bt, left, BtLockWrite) )
+       if( bt_lockpage(bt, sibling->page_no, BtLockWrite) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_unlockpage(bt, left, BtLockAccess) )
+       if( bt_unlockpage(bt, sibling->page_no, BtLockAccess) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_mappage (bt, &bt->temp, left) )
+       if( bt_mappage (bt, &sibling->page, sibling->page_no) )
                return bt->err;
 
        //  wait until sibling is in our parent
 
                return bt->err;
 
        //  wait until sibling is in our parent
 
-       if( bt_getid (bt->temp->right) != page_no ) {
+       if( bt_getid (sibling->page->right) != set->page_no ) {
                if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
                        return bt->err;
                if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
                        return bt->err;
-               if( bt_unlockpage (bt, left, BtLockWrite) )
-                       return bt->err;
-               if( bt_unlockpage (bt, left, BtLockParent) )
+               if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) )
                        return bt->err;
                        return bt->err;
-               if( bt_unlockpage (bt, page_no, BtLockWrite) )
-                       return bt->err;
-               if( bt_unlockpage (bt, page_no, BtLockDelete) )
+               if( bt_unlockpage (bt, sibling->page_no, BtLockParent) )
                        return bt->err;
 #ifdef linux
                sched_yield();
 #else
                SwitchToThread();
 #endif
                        return bt->err;
 #ifdef linux
                sched_yield();
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        }
 
+       //      map page being killed
+
+       if( bt_lockpage (bt, set->page_no, BtLockDelete) )
+               return bt->err;
+
+       if( bt_lockpage (bt, set->page_no, BtLockWrite) )
+               return bt->err;
+
+       if( bt_mappage (bt, &set->page, set->page_no) )
+               return bt->err;
+
        //      delete our left sibling from parent
 
        slotptr(parent->page,idx)->dead = 1;
        //      delete our left sibling from parent
 
        slotptr(parent->page,idx)->dead = 1;
@@ -1555,11 +1628,11 @@ retry:
 
        //      redirect our parent slot to our left sibling
 
 
        //      redirect our parent slot to our left sibling
 
-       bt_putid (slotptr(parent->page, slot)->id, left);
-
-       //      update left page with our fence key
+       bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
+       memcpy (sibling->page->right, set->page->right, BtId);
 
 
-       memcpy (bt->temp->right, bt->page->right, BtId);
+       if( bt_update (bt, sibling->page, sibling->page_no) )
+               return bt->err;
 
        //      collapse dead slots from parent
 
 
        //      collapse dead slots from parent
 
@@ -1575,21 +1648,37 @@ retry:
        if( bt_update (bt, parent->page, parent->page_no) )
                return bt->err;
 
        if( bt_update (bt, parent->page, parent->page_no) )
                return bt->err;
 
+       //  free our original page
+
+       if( bt_freepage (bt, set) )
+               return bt->err;
+
        //      go down the left node's fence keys to the leaf level
        //      and update the fence keys in each page
 
        //      go down the left node's fence keys to the leaf level
        //      and update the fence keys in each page
 
-       if( bt_fixfences (bt, left, pagefence) )
+       memcpy (newfence, parent->page->fence, 256);
+
+       if( bt_fixfences (bt, sibling, newfence) )
                return bt->err;
 
                return bt->err;
 
-       if( bt_unlockpage (bt, parent->page_no, BtLockWrite) )
+       //      promote sibling as new root?
+
+       if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
+        if( sibling->page->lvl ) {
+         if( bt_lockpage (bt, sibling->page_no, BtLockDelete) )
                return bt->err;
 
                return bt->err;
 
-       //  free our original page
+         if( bt_lockpage (bt, sibling->page_no, BtLockWrite) )
+               return bt->err;
 
 
-       if( bt_mappage (bt, &bt->temp, page_no) )
+         if( bt_mappage (bt, &sibling->page, set->page_no) )
                return bt->err;
 
                return bt->err;
 
-       return bt_freepage (bt, bt->temp,page_no);
+         return bt_removeroot (bt, parent, sibling);
+        }
+
+       return bt_unlockpage (bt, parent->page_no, BtLockWrite);
+  }
 }
 
 //  find and delete key on page by marking delete flag bit
 }
 
 //  find and delete key on page by marking delete flag bit
@@ -1598,7 +1687,7 @@ retry:
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
 {
 unsigned char pagefence[256];
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
 {
 unsigned char pagefence[256];
-uint slot, found, act, idx;
+uint slot, found, idx;
 BtPageSet set[1];
 BtKey ptr;
 
 BtPageSet set[1];
 BtKey ptr;
 
@@ -1626,21 +1715,27 @@ BtKey ptr;
                          memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
                        } else
                                break;
                          memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
                        } else
                                break;
+               }
 
 
-                 if( bt_update(bt, set->page, set->page_no) )
+       if( set->page->act ) {
+               if( bt_update(bt, set->page, set->page_no) )
                        return bt->err;
                        return bt->err;
-               }
+               bt->found = found;
+               return bt_unlockpage (bt, set->page_no, BtLockWrite);
+       }
 
        // delete page when empty
 
        memcpy (pagefence, set->page->fence, 256);
 
        // delete page when empty
 
        memcpy (pagefence, set->page->fence, 256);
-       act = set->page->act;
+       set->page->kill = 1;
+
+       if( bt_update(bt, set->page, set->page_no) )
+               return bt->err;
 
        if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
                return bt->err;
 
 
        if( bt_unlockpage(bt, set->page_no, BtLockWrite) )
                return bt->err;
 
-       if( !act )
-         if( bt_removepage (bt, set->page_no, 0, pagefence) )
+       if( bt_removepage (bt, set->page_no, 0, pagefence) )
                return bt->err;
 
        bt->found = found;
                return bt->err;
 
        bt->found = found;
@@ -1711,15 +1806,18 @@ BtKey key;
                if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
+               off = slotptr(bt->frame,cnt)->off;
+
                // copy key
                // copy key
-               if( !page->lvl || cnt < max ) {
+
+               if( off >= sizeof(*page) ) {
                        key = keyptr(bt->frame, cnt);
                        off = nxt -= key->len + 1;
                        memcpy ((unsigned char *)page + nxt, key, key->len + 1);
                        key = keyptr(bt->frame, cnt);
                        off = nxt -= key->len + 1;
                        memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-               } else
-                       off = offsetof(struct BtPage_, fence);
+               }
 
                // copy slot
 
                // copy slot
+
                memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = off;
                memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
                slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
                slotptr(page, idx)->off = off;
@@ -1793,9 +1891,9 @@ uid new_page;
 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
 uint cnt = 0, idx = 0, max, nxt = bt->page_size, off;
 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
 uint cnt = 0, idx = 0, max, nxt = bt->page_size, off;
-uid right, page_no = set->page_no;
 unsigned char fencekey[256];
 uint lvl = set->page->lvl;
 unsigned char fencekey[256];
 uint lvl = set->page->lvl;
+uid right;
 BtKey key;
 
        //  split higher half of keys to bt->frame
 BtKey key;
 
        //  split higher half of keys to bt->frame
@@ -1819,7 +1917,7 @@ BtKey key;
                bt->frame->act++;
        }
 
                bt->frame->act++;
        }
 
-       if( page_no == ROOT_page )
+       if( set->page_no == ROOT_page )
                bt->frame->posted = 1;
 
        memcpy (bt->frame->fence, set->page->fence, 256);
                bt->frame->posted = 1;
 
        memcpy (bt->frame->fence, set->page->fence, 256);
@@ -1830,7 +1928,7 @@ BtKey key;
 
        // link right node
 
 
        // link right node
 
-       if( page_no > ROOT_page )
+       if( set->page_no > ROOT_page )
                memcpy (bt->frame->right, set->page->right, BtId);
 
        //      get new free page and write higher keys to it.
                memcpy (bt->frame->right, set->page->right, BtId);
 
        //      get new free page and write higher keys to it.
@@ -1877,65 +1975,53 @@ BtKey key;
 
        // if current page is the root page, split it
 
 
        // if current page is the root page, split it
 
-       if( page_no == ROOT_page )
+       if( set->page_no == ROOT_page )
                return bt_splitroot (bt, set, right);
 
                return bt_splitroot (bt, set, right);
 
-       if( bt_update (bt, set->page, page_no) )
+       if( bt_update (bt, set->page, set->page_no) )
                return bt->err; 
 
                return bt->err; 
 
-       if( bt_unlockpage (bt, page_no, BtLockWrite) )
+       if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
                return bt->err;
 
        // insert new fences in their parent pages
 
        while( 1 ) {
                return bt->err;
 
        // insert new fences in their parent pages
 
        while( 1 ) {
-               if( bt_lockpage (bt, page_no, BtLockParent) )
+               if( bt_lockpage (bt, set->page_no, BtLockParent) )
                        return bt->err;
 
                        return bt->err;
 
-               if( bt_lockpage (bt, page_no, BtLockRead) )
+               if( bt_lockpage (bt, set->page_no, BtLockWrite) )
                        return bt->err;
 
                        return bt->err;
 
-               if( bt_mappage (bt, &set->page, page_no) )
+               if( bt_mappage (bt, &set->page, set->page_no) )
                        return bt->err;
 
                memcpy (fencekey, set->page->fence, 256);
                        return bt->err;
 
                memcpy (fencekey, set->page->fence, 256);
+               right = bt_getid (set->page->right);
 
                if( set->page->posted ) {
 
                if( set->page->posted ) {
-                 if( bt_unlockpage (bt, page_no, BtLockParent) )
+                 if( bt_unlockpage (bt, set->page_no, BtLockParent) )
                        return bt->err;
                        
                        return bt->err;
                        
-                 return bt_unlockpage (bt, page_no, BtLockRead);
+                 return bt_unlockpage (bt, set->page_no, BtLockWrite);
                }
 
                }
 
-               if( bt_unlockpage (bt, page_no, BtLockRead) )
-                       return bt->err;
-
-               if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
-                       return bt->err;
-
-               if( bt_lockpage (bt, page_no, BtLockWrite) )
-                       return bt->err;
-
-               if( bt_mappage (bt, &set->page, page_no) )
-                       return bt->err;
-
-               right = bt_getid (set->page->right);
                set->page->posted = 1;
 
                set->page->posted = 1;
 
-               if( bt_update (bt, set->page, page_no) )
+               if( bt_update (bt, set->page, set->page_no) )
                        return bt->err; 
 
                        return bt->err; 
 
-               if( bt_unlockpage (bt, page_no, BtLockWrite) )
+               if( bt_unlockpage (bt, set->page_no, BtLockWrite) )
                        return bt->err;
 
                        return bt->err;
 
-               if( bt_unlockpage (bt, page_no, BtLockParent) )
+               if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, set->page_no, time(NULL)) )
                        return bt->err;
 
                        return bt->err;
 
-               if( !(page_no = right) )
-                       break;
-
-               if( bt_mappage (bt, &set->page, page_no) )
+               if( bt_unlockpage (bt, set->page_no, BtLockParent) )
                        return bt->err;
                        return bt->err;
+
+               if( !(set->page_no = right) )
+                       break;
        }
 
        return 0;
        }
 
        return 0;
@@ -2030,7 +2116,9 @@ uint slot;
        // cache page for retrieval
        if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
                memcpy (bt->cursor, set->page, bt->page_size);
        // cache page for retrieval
        if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
                memcpy (bt->cursor, set->page, bt->page_size);
+
        bt->cursor_page = set->page_no;
        bt->cursor_page = set->page_no;
+
        if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
                return 0;
 
        if ( bt_unlockpage(bt, set->page_no, BtLockRead) )
                return 0;