]> pd.if.org Git - btree/commitdiff
Replace mutex implementation, and fix a few bugs.
authorunknown <karl@E04.petzent.com>
Tue, 28 Oct 2014 02:35:20 +0000 (19:35 -0700)
committerunknown <karl@E04.petzent.com>
Tue, 28 Oct 2014 02:35:20 +0000 (19:35 -0700)
threadskv10e.c

index 62285128eecc0c0d8951ab1be3c83fdcd2320d14..fa5fdfbd7eea769f7c4a0193e857b2bbf1b32cff 100644 (file)
@@ -112,14 +112,21 @@ typedef enum{
 } BtLock;
 
 typedef struct {
 } BtLock;
 
 typedef struct {
-       volatile uint value[1];
-} BtMutexLatch;
+  union {
+       struct {
+         volatile unsigned char xcl[1];
+         volatile unsigned char filler;
+         volatile ushort waiters[1];
+       } bits[1];
+       uint value[1];
+  };
+} MutexLatch;
 
 //     definition for reader/writer reentrant lock implementation
 
 typedef struct {
 
 //     definition for reader/writer reentrant lock implementation
 
 typedef struct {
-  BtMutexLatch xcl[1];
-  BtMutexLatch wrt[1];
+  MutexLatch xcl[1];
+  MutexLatch wrt[1];
   uint readers;
   ushort dup;          // re-entrant locks
   ushort tid;          // owner thread-no
   uint readers;
   ushort dup;          // re-entrant locks
   ushort tid;          // owner thread-no
@@ -129,7 +136,7 @@ typedef struct {
 //  hash table entries
 
 typedef struct {
 //  hash table entries
 
 typedef struct {
-       BtMutexLatch latch[1];
+       MutexLatch latch[1];
        uint entry;             // Latch table entry at head of chain
 } BtHashEntry;
 
        uint entry;             // Latch table entry at head of chain
 } BtHashEntry;
 
@@ -137,7 +144,7 @@ typedef struct {
 
 typedef struct {
        uid page_no;                    // latch set page number
 
 typedef struct {
        uid page_no;                    // latch set page number
-       BtMutexLatch modify[1]; // modify entry lite latch
+       MutexLatch modify[1];   // modify entry lite latch
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
        RWLock parent[1];               // Posting of fence key in parent
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
        RWLock parent[1];               // Posting of fence key in parent
@@ -145,7 +152,7 @@ typedef struct {
        uint split;                             // right split page atomic insert
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
        uint split;                             // right split page atomic insert
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
-       ushort pin;                             // number of accessing threads
+       volatile ushort pin;    // number of accessing threads
        unsigned char dirty;    // page in cache is dirty
        unsigned char leaf;             // page in cache is a leaf
 } BtLatchSet;
        unsigned char dirty;    // page in cache is dirty
        unsigned char leaf;             // page in cache is a leaf
 } BtLatchSet;
@@ -175,6 +182,7 @@ typedef struct {
 
 typedef enum {
        Unique,
 
 typedef enum {
        Unique,
+       Update,
        Librarian,
        Duplicate,
        Delete
        Librarian,
        Duplicate,
        Delete
@@ -266,9 +274,9 @@ typedef struct {
        unsigned char *leafpool;        // mapped to the leaf pool pages
        unsigned char *redobuff;        // mapped recovery buffer pointer
        logseqno lsn, flushlsn;         // current & first lsn flushed
        unsigned char *leafpool;        // mapped to the leaf pool pages
        unsigned char *redobuff;        // mapped recovery buffer pointer
        logseqno lsn, flushlsn;         // current & first lsn flushed
-       BtMutexLatch redo[1];           // redo area lite latch
-       BtMutexLatch lock[1];           // allocation area lite latch
-       BtMutexLatch maps[1];           // mapping segments lite latch
+       MutexLatch redo[1];                     // redo area lite latch
+       MutexLatch lock[1];                     // allocation area lite latch
+       MutexLatch maps[1];                     // mapping segments lite latch
        ushort thread_no[1];            // highest thread number issued
        ushort err_thread;                      // error thread number
        uint nlatchpage;                        // size of buffer pool & latchsets
        ushort thread_no[1];            // highest thread number issued
        ushort err_thread;                      // error thread number
        uint nlatchpage;                        // size of buffer pool & latchsets
@@ -291,7 +299,7 @@ typedef struct {
        HANDLE halloc;                          // allocation handle
        HANDLE hpool;                           // buffer pool handle
 #endif
        HANDLE halloc;                          // allocation handle
        HANDLE hpool;                           // buffer pool handle
 #endif
-       uint segments;                          // number of memory mapped segments
+       volatile uint segments;         // number of memory mapped segments
        unsigned char *pages[64000];// memory mapped segments of b-tree
 } BtMgr;
 
        unsigned char *pages[64000];// memory mapped segments of b-tree
 } BtMgr;
 
@@ -453,65 +461,43 @@ int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *add
 {
        return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
 }
 {
        return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
 }
-/*
-void bt_mutexlock(BtMutexLatch *latch)
-{
-uint idx = 200;
-
- while( __sync_val_compare_and_swap (latch->value, 0, 1) )
-  if( !idx )
-       sched_yield ();
-  else
-       idx--;
-}
 
 
-int bt_mutextry(BtMutexLatch *latch)
-{
-  return !__sync_val_compare_and_swap (latch->value, 0, 1);
-}
-
-void bt_releasemutex(BtMutexLatch *latch)
-{
-  if( !__sync_lock_test_and_set (latch->value, 0) )
-       abort();
-}
-*/
-void bt_mutexlock(BtMutexLatch *latch)
+void bt_mutexlock(MutexLatch *latch)
 {
 {
-uint idx;
+uint idx, waited = 0;
+MutexLatch prev[1];
 
 
-  for( idx = 0; idx < 100; idx++ )
-       if( !__sync_val_compare_and_swap (latch->value, 0, 1) )
+ while( 1 ) {
+  for( idx = 0; idx < 100; idx++ ) {
+       *prev->value = __sync_fetch_and_or (latch->value, 1);
+       if( !*prev->bits->xcl ) {
+         if( waited )
+               __sync_fetch_and_sub (latch->bits->waiters, 1);
          return;
          return;
+       }
+  }
 
 
-  while( __sync_lock_test_and_set (latch->value, 2) )
-       sys_futex ((uint *)latch->value, FUTEX_WAIT_PRIVATE, 2, NULL, NULL, 0);
+  if( !waited ) {
+       __sync_fetch_and_add (latch->bits->waiters, 1);
+       *prev->bits->waiters += 1;
+       waited++;
+  }
+
+  sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
+ }
 }
 
 }
 
-int bt_mutextry(BtMutexLatch *latch)
+int bt_mutextry(MutexLatch *latch)
 {
 {
-  return !__sync_val_compare_and_swap (latch->value, 0, 1);
+       return !__sync_lock_test_and_set (latch->bits->xcl, 1);
 }
 
 }
 
-void bt_releasemutex(BtMutexLatch *latch)
+void bt_releasemutex(MutexLatch *latch)
 {
 {
-uint idx;
-
-  if( *latch->value == 2 )
-       *latch->value = 0;
-  else if( __sync_lock_test_and_set (latch->value, 0) == 1 )
-       return;
+       *latch->bits->xcl = 0;
 
 
-  if( latch->value[0] )
-   if( __sync_val_compare_and_swap (latch->value, 1, 2) )
-       return;
-
-//  for( idx = 0; idx < 200; idx++ )
-//     if( latch->value[0] )
-//       if( __sync_val_compare_and_swap (latch->value, 1, 2) )
-//             return;
-  
-  sys_futex( (uint *)latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
+       if( *latch->bits->waiters )
+               sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
 }
 
 //     reader/writer lock implementation
 }
 
 //     reader/writer lock implementation
@@ -525,8 +511,7 @@ void WriteLock (RWLock *lock, ushort tid, uint line)
        bt_mutexlock (lock->xcl);
        bt_mutexlock (lock->wrt);
        bt_releasemutex (lock->xcl);
        bt_mutexlock (lock->xcl);
        bt_mutexlock (lock->wrt);
        bt_releasemutex (lock->xcl);
-//if( lock->tid )
-//abort();
+
        lock->line = line;
        lock->tid = tid;
 }
        lock->line = line;
        lock->tid = tid;
 }
@@ -839,7 +824,7 @@ unsigned char *perm;
                segment++, fragment = 0;
 
        if( segment < mgr->segments ) {
                segment++, fragment = 0;
 
        if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + (fragment << mgr->page_bits);
+         perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
 
          memcpy ((unsigned char *)page + off, perm, mgr->page_size);
          off += mgr->page_size;
 
          memcpy ((unsigned char *)page + off, perm, mgr->page_size);
          off += mgr->page_size;
@@ -860,10 +845,6 @@ unsigned char *perm;
        bt_releasemutex (mgr->maps);
   }
 
        bt_releasemutex (mgr->maps);
   }
 
-//if( !leaf && !page->lvl )
-//abort();
-//if( leaf && page->lvl )
-//abort();
   return 0;
 }
 
   return 0;
 }
 
@@ -876,12 +857,6 @@ uint page_size = mgr->page_size;
 uint off = 0, segment, fragment;
 unsigned char *perm;
 
 uint off = 0, segment, fragment;
 unsigned char *perm;
 
-//if( !leaf && !page->lvl )
-//abort();
-//if( leaf && page->lvl )
-//abort();
-//if( !page->lvl && mgr->leaf_xtra == 8 )
-//fprintf(stderr, "WrPage %d\n", (uint)page_no);
   if( leaf )
        page_size <<= mgr->leaf_xtra;
 
   if( leaf )
        page_size <<= mgr->leaf_xtra;
 
@@ -894,7 +869,7 @@ unsigned char *perm;
                segment++, fragment = 0;
 
        if( segment < mgr->segments ) {
                segment++, fragment = 0;
 
        if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + (fragment << mgr->page_bits);
+         perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
          memcpy (perm, (unsigned char *)page + off, mgr->page_size);
          off += mgr->page_size;
          fragment++;
          memcpy (perm, (unsigned char *)page + off, mgr->page_size);
          off += mgr->page_size;
          fragment++;
@@ -919,11 +894,11 @@ unsigned char *perm;
 //     set CLOCK bit in latch
 //     decrement pin count
 
 //     set CLOCK bit in latch
 //     decrement pin count
 
+int value;
+
 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
 {
        bt_mutexlock(latch->modify);
 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
 {
        bt_mutexlock(latch->modify);
-//if( !(latch->pin & ~CLOCK_bit) )
-//abort();
        latch->pin |= CLOCK_bit;
        latch->pin--;
 
        latch->pin |= CLOCK_bit;
        latch->pin--;
 
@@ -937,14 +912,12 @@ BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
 BtPage page;
 
 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
 BtPage page;
 
-       if( latch->leaf )
-               page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
-       else
-               page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
-//if( latch->leaf )
-//if( page->lvl )
-//abort();
-       return page;
+  if( latch->leaf )
+       page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
+  else
+       page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
+
+  return page;
 }
 
 //  return next available leaf entry
 }
 
 //  return next available leaf entry
@@ -1035,12 +1008,13 @@ BtPage page;
 
   bt_mutexlock(mgr->leaftable[hashidx].latch);
 
 
   bt_mutexlock(mgr->leaftable[hashidx].latch);
 
-  if( entry = mgr->leaftable[hashidx].entry ) do
-  {
+  if( entry = mgr->leaftable[hashidx].entry )
+   do {
        latch = mgr->leafsets + entry;
        latch = mgr->leafsets + entry;
+
        if( page_no == latch->page_no )
                break;
        if( page_no == latch->page_no )
                break;
-  } while( entry = latch->next );
+   } while( entry = latch->next );
 
   //  found our entry: increment pin
 
 
   //  found our entry: increment pin
 
@@ -1049,8 +1023,7 @@ BtPage page;
        bt_mutexlock(latch->modify);
        latch->pin |= CLOCK_bit;
        latch->pin++;
        bt_mutexlock(latch->modify);
        latch->pin |= CLOCK_bit;
        latch->pin++;
-//if( !latch->leaf )
-//abort();
+
        bt_releasemutex(latch->modify);
        bt_releasemutex(mgr->leaftable[hashidx].latch);
        return latch;
        bt_releasemutex(latch->modify);
        bt_releasemutex(mgr->leaftable[hashidx].latch);
        return latch;
@@ -1059,12 +1032,8 @@ BtPage page;
   //  find and reuse unpinned entry
 
 trynext:
   //  find and reuse unpinned entry
 
 trynext:
-
   entry = bt_availleaf (mgr);
   latch = mgr->leafsets + entry;
   entry = bt_availleaf (mgr);
   latch = mgr->leafsets + entry;
-//if( latch->page_no )
-//if( !latch->leaf )
-//abort();
 
   idx = latch->page_no % mgr->leafhash;
 
 
   idx = latch->page_no % mgr->leafhash;
 
@@ -1097,9 +1066,6 @@ trynext:
   //  update permanent page area in btree from buffer pool
   //   no read-lock is required since page is not pinned.
 
   //  update permanent page area in btree from buffer pool
   //   no read-lock is required since page is not pinned.
 
-//if( latch->page_no )
-//if( !latch->leaf )
-//abort();
   if( latch->dirty )
        if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
          return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
   if( latch->dirty )
        if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
          return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
@@ -1152,6 +1118,7 @@ BtPage page;
   if( entry = mgr->hashtable[hashidx].entry ) do
   {
        latch = mgr->latchsets + entry;
   if( entry = mgr->hashtable[hashidx].entry ) do
   {
        latch = mgr->latchsets + entry;
+
        if( page_no == latch->page_no )
                break;
   } while( entry = latch->next );
        if( page_no == latch->page_no )
                break;
   } while( entry = latch->next );
@@ -1171,7 +1138,6 @@ BtPage page;
   //  find and reuse unpinned entry
 
 trynext:
   //  find and reuse unpinned entry
 
 trynext:
-
   entry = bt_availnext (mgr);
   latch = mgr->latchsets + entry;
 
   entry = bt_availnext (mgr);
   latch = mgr->latchsets + entry;
 
@@ -1206,9 +1172,6 @@ trynext:
   //  update permanent page area in btree from buffer pool
   //   no read-lock is required since page is not pinned.
 
   //  update permanent page area in btree from buffer pool
   //   no read-lock is required since page is not pinned.
 
-//if( latch->page_no )
-//if( latch->leaf )
-//abort();
   if( latch->dirty )
        if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
          return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
   if( latch->dirty )
        if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
          return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
@@ -1802,6 +1765,10 @@ BtKey *ptr;
 
        bt_lockpage(mode, set->latch, thread_no, __LINE__);
 
 
        bt_lockpage(mode, set->latch, thread_no, __LINE__);
 
+       // grab our fence key
+
+       ptr=keyptr(set->page,set->page->cnt);
+
        if( set->page->free )
                return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
 
        if( set->page->free )
                return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
 
@@ -1828,11 +1795,27 @@ BtKey *ptr;
        prevpage = set->page;
        prevmode = mode;
 
        prevpage = set->page;
        prevmode = mode;
 
+       //  if requested key is beyond our fence,
+       //      slide to the right
+
+       if( keycmp (ptr, key, len) < 0 )
+         if( page_no = bt_getid(set->page->right) )
+               continue;
+
+       //  if page is part of a delete operation,
+       //      slide to the left;
+
+       if( set->page->kill ) {
+         bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
+         page_no = bt_getid(set->page->left);
+         bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
+         continue;
+       }
+
        //  find key on page at this level
        //  and descend to requested level
 
        //  find key on page at this level
        //  and descend to requested level
 
-       if( !set->page->kill )
-        if( slot = bt_findslot (set->page, key, len) ) {
+       if( slot = bt_findslot (set->page, key, len) ) {
          if( drill == lvl )
                return slot;
 
          if( drill == lvl )
                return slot;
 
@@ -1847,7 +1830,7 @@ BtKey *ptr;
          val = valptr(set->page, slot);
 
          if( val->len == BtId )
          val = valptr(set->page, slot);
 
          if( val->len == BtId )
-               page_no = bt_getid(valptr(set->page, slot)->value);
+               page_no = bt_getid(val->value);
          else
                return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
 
          else
                return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
 
@@ -1857,9 +1840,7 @@ BtKey *ptr;
 
        //  slide right into next page
 
 
        //  slide right into next page
 
-       bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
        page_no = bt_getid(set->page->right);
        page_no = bt_getid(set->page->right);
-       bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
 
   } while( page_no );
 
 
   } while( page_no );
 
@@ -1877,8 +1858,6 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
 {
 unsigned char *freechain;
 
 {
 unsigned char *freechain;
 
-//if( (set->latch->pin & ~CLOCK_bit) > 1 )
-//abort();
        if( set->page->lvl )
                freechain = mgr->pagezero->freechain;
        else {
        if( set->page->lvl )
                freechain = mgr->pagezero->freechain;
        else {
@@ -2004,21 +1983,20 @@ uint idx;
 //     returns with page unpinned
 //     from the page pool.
 
 //     returns with page unpinned
 //     from the page pool.
 
-BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
+BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
 {
 {
-unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
+unsigned char lowerfence[BT_keyarray];
 uint page_size = mgr->page_size;
 uint page_size = mgr->page_size;
+BtPageSet right[1], temp[1];
 unsigned char value[BtId];
 unsigned char value[BtId];
-uint lvl = set->page->lvl;
-BtPageSet right[1];
-uid page_no;
+uid page_no, right2;
 BtKey *ptr;
 
        if( !lvl )
                page_size <<= mgr->leaf_xtra;
 
 BtKey *ptr;
 
        if( !lvl )
                page_size <<= mgr->leaf_xtra;
 
-       //      cache copy of fence key
-       //      to remove in parent
+       //  cache copy of original fence key
+       //      that is being deleted.
 
        ptr = keyptr(set->page, set->page->cnt);
        memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
 
        ptr = keyptr(set->page, set->page->cnt);
        memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
@@ -2031,65 +2009,71 @@ BtKey *ptr;
                right->page = bt_mappage (mgr, right->latch);
        else
                return 0;
                right->page = bt_mappage (mgr, right->latch);
        else
                return 0;
-//if( right->page->lvl )
-//abort();
-       bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
-
-       // cache copy of key to update
 
 
-       ptr = keyptr(right->page, right->page->cnt);
-       memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
+       bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
 
        if( right->page->kill )
                return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
        // pull contents of right peer into our empty page
 
        if( right->page->kill )
                return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
        // pull contents of right peer into our empty page
+       //      preserving our left page number.
 
 
-       memcpy (right->page->left, set->page->left, BtId);
+       bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
+       page_no = bt_getid(set->page->left);
        memcpy (set->page, right->page, page_size);
        memcpy (set->page, right->page, page_size);
+       bt_putid (set->page->left, page_no);
+       bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
+
        set->page->page_no = set->latch->page_no;
        set->latch->dirty = 1;
 
        set->page->page_no = set->latch->page_no;
        set->latch->dirty = 1;
 
-       // mark right page deleted and point it to left page
-       //      until we can post parent updates that remove access
-       //      to the deleted page.
+       if( right2 = bt_getid (right->page->right) ) {
+         if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
+               temp->page = bt_mappage (mgr, temp->latch);
+         else
+               return 0;
 
 
-       bt_putid (right->page->right, set->latch->page_no);
-       right->latch->dirty = 1;
-       right->page->kill = 1;
+      bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+         bt_putid (temp->page->left, set->latch->page_no);
+         bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+         bt_unpinlatch (temp->latch, thread_no, __LINE__);
+       } else {        // page is rightmost
+         bt_mutexlock (mgr->lock);
+         bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
+         bt_releasemutex(mgr->lock);
+       }
 
 
-       bt_lockpage (BtLockParent, right->latch, thread_no, __LINE__);
-       bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
+       // mark right page deleted and release lock
 
 
-       bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
-       bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
+       right->latch->dirty = 1;
+       right->page->kill = 1;
 
        // redirect higher key directly to our new node contents
 
 
        // redirect higher key directly to our new node contents
 
+       ptr = keyptr(right->page, right->page->cnt);
        bt_putid (value, set->latch->page_no);
        bt_putid (value, set->latch->page_no);
-       ptr = (BtKey*)higherfence;
 
 
-       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
+       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
          return mgr->err;
 
          return mgr->err;
 
-       //      delete old lower key to our node
+       bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
+
+       //      delete orignal fence key in parent
+       //      and unlock our page.
 
 
-       ptr = (BtKey*)lowerfence;
+       ptr = (BtKey *)lowerfence;
 
        if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
          return mgr->err;
 
 
        if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
          return mgr->err;
 
+       bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, thread_no, __LINE__);
+
        //      obtain delete and write locks to right node
 
        //      obtain delete and write locks to right node
 
-       bt_unlockpage (BtLockParent, right->latch, thread_no, __LINE__);
        bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
        bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
        bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
        bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
-while( (right->latch->pin & ~CLOCK_bit) > 1 )
-sched_yield();
        bt_freepage (mgr, right, thread_no);
        bt_freepage (mgr, right, thread_no);
-//fprintf(stderr, "DelPage %d by %d at %d\n", (uint)right->latch->page_no, thread_no, __LINE__); 
-       bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
        return 0;
 }
 
        return 0;
 }
 
@@ -2166,7 +2150,7 @@ BtVal *val;
        //      delete empty page
 
        if( !set->page->act )
        //      delete empty page
 
        if( !set->page->act )
-         return bt_deletepage (mgr, set, thread_no);
+         return bt_deletepage (mgr, set, thread_no, set->page->lvl);
 
        set->latch->dirty = 1;
        bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
 
        set->latch->dirty = 1;
        bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
@@ -2441,16 +2425,17 @@ BtVal *val;
 //     return pool entry for new right
 //     page, pinned & unlocked
 
 //     return pool entry for new right
 //     page, pinned & unlocked
 
-uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
+uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
 {
 uint page_size = mgr->page_size;
 {
 uint page_size = mgr->page_size;
+BtPageSet right[1], temp[1];
 uint cnt = 0, idx = 0, max;
 uint lvl = set->page->lvl;
 uint cnt = 0, idx = 0, max;
 uint lvl = set->page->lvl;
-BtPageSet right[1];
 BtKey *key, *ptr;
 BtVal *val, *src;
 BtPage frame;
 uid right2;
 BtKey *key, *ptr;
 BtVal *val, *src;
 BtPage frame;
 uid right2;
+uint entry;
 uint prev;
 
        if( !set->page->lvl )
 uint prev;
 
        if( !set->page->lvl )
@@ -2499,14 +2484,36 @@ uint prev;
 
        // link right node
 
 
        // link right node
 
-       if( set->latch->page_no > ROOT_page )
-               bt_putid (frame->right, bt_getid (set->page->right));
+       if( set->latch->page_no > ROOT_page ) {
+               right2 = bt_getid (set->page->right);
+               bt_putid (frame->right, right2);
+               bt_putid (frame->left, set->latch->page_no);
+       }
 
        //      get new free page and write higher keys to it.
 
        if( bt_newpage(mgr, right, frame, thread_no) )
                return 0;
 
 
        //      get new free page and write higher keys to it.
 
        if( bt_newpage(mgr, right, frame, thread_no) )
                return 0;
 
+       //      link far right's left pointer to new page
+
+       if( linkleft && set->latch->page_no > ROOT_page )
+        if( right2 ) {
+         if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
+               temp->page = bt_mappage (mgr, temp->latch);
+         else
+               return 0;
+
+      bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+         bt_putid (temp->page->left, right->latch->page_no);
+         bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+         bt_unpinlatch (temp->latch, thread_no, __LINE__);
+        } else {       // page is rightmost
+         bt_mutexlock (mgr->lock);
+         bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
+         bt_releasemutex(mgr->lock);
+        }
+
        // process lower keys
 
        memcpy (frame, set->page, page_size);
        // process lower keys
 
        memcpy (frame, set->page, page_size);
@@ -2520,9 +2527,6 @@ uint prev;
        cnt = 0;
        idx = 0;
 
        cnt = 0;
        idx = 0;
 
-       if( slotptr(frame, max)->type == Librarian )
-               max--;
-
        //  assemble page of smaller keys
 
        while( cnt++ < max ) {
        //  assemble page of smaller keys
 
        while( cnt++ < max ) {
@@ -2553,7 +2557,8 @@ uint prev;
        set->page->cnt = idx;
        free(frame);
 
        set->page->cnt = idx;
        free(frame);
 
-       return right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
+       entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
+       return entry;
 }
 
 //     fix keys for newly split page
 }
 
 //     fix keys for newly split page
@@ -2568,8 +2573,6 @@ uint lvl = set->page->lvl;
 BtPage page;
 BtKey *ptr;
 
 BtPage page;
 BtKey *ptr;
 
-//if( !lvl )
-//abort();
        // if current page is the root page, split it
 
        if( set->latch->page_no == ROOT_page )
        // if current page is the root page, split it
 
        if( set->latch->page_no == ROOT_page )
@@ -2626,8 +2629,7 @@ BtKey *ptr;
 BtVal *val;
 int rate;
 
 BtVal *val;
 int rate;
 
-       //      if found slot > desired slot and previous slot
-       //      is a librarian slot, use it
+       //      if previous slot is a librarian slot, use it
 
        if( slot > 1 )
          if( slotptr(set->page, slot-1)->type == Librarian )
 
        if( slot > 1 )
          if( slotptr(set->page, slot-1)->type == Librarian )
@@ -2730,8 +2732,8 @@ BtVal *val;
 
        // if librarian slot == found slot, advance to real slot
 
 
        // if librarian slot == found slot, advance to real slot
 
-       if( node->type == Librarian )
-         if( !keycmp (ptr, key, keylen) ) {
+       if( node->type == Librarian ) {
+//       if( !keycmp (ptr, key, keylen) ) {
                ptr = keyptr(set->page, ++slot);
                node = slotptr(set->page, slot);
          }
                ptr = keyptr(set->page, ++slot);
                node = slotptr(set->page, slot);
          }
@@ -2744,78 +2746,84 @@ BtVal *val;
        switch( type ) {
        case Unique:
        case Duplicate:
        switch( type ) {
        case Unique:
        case Duplicate:
-         if( keycmp (ptr, key, keylen) )
-          if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) {
+         if( !keycmp (ptr, key, keylen) )
+               break;
+
+         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
            if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
                  return mgr->err;
            if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
                  return mgr->err;
+               else
+                 goto insxit;
 
 
-               bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
-               bt_unpinlatch (set->latch, thread_no, __LINE__);
-               return 0;
-          } else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
-               return mgr->err_thread = thread_no, mgr->err;
-          else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
-               return mgr->err_thread = thread_no, mgr->err;
-          else
-               continue;
-
-         // if key already exists, update value and return
+         if( entry = bt_splitpage (mgr, set, thread_no, 1) )
+           if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+                 continue;
 
 
-         val = valptr(set->page, slot);
+         return mgr->err_thread = thread_no, mgr->err;
 
 
-         if( val->len >= vallen ) {
-               if( slotptr(set->page, slot)->dead )
-                       set->page->act++;
-               node->type = type;
-               node->dead = 0;
+       case Update:
+         if( keycmp (ptr, key, keylen) )
+               goto insxit;
 
 
-               set->page->garbage += val->len - vallen;
-               set->latch->dirty = 1;
-               val->len = vallen;
-               memcpy (val->value, value, vallen);
-               bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
-               bt_unpinlatch (set->latch, thread_no, __LINE__);
-               return 0;
-         }
+         break;
+       }
 
 
-         //  new update value doesn't fit in existing value area
-         //    make sure page has room
+       // if key already exists, update value and return
 
 
-         if( !node->dead )
-               set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
-         else
-               set->page->act++;
+       val = valptr(set->page, slot);
 
 
+       if( val->len >= vallen ) {
+         if( node->dead )
+               set->page->act++;
          node->type = type;
          node->dead = 0;
 
          node->type = type;
          node->dead = 0;
 
-         if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
-          if( !(entry = bt_splitpage (mgr, set, thread_no)) )
-               return mgr->err_thread = thread_no, mgr->err;
-          else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
-               return mgr->err_thread = thread_no, mgr->err;
-          else
+         set->page->garbage += val->len - vallen;
+         set->latch->dirty = 1;
+         val->len = vallen;
+         memcpy (val->value, value, vallen);
+         goto insxit;
+       }
+
+       //  new update value doesn't fit in existing value area
+       //      make sure page has room
+
+       if( !node->dead )
+         set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
+       else
+         set->page->act++;
+
+       node->type = type;
+       node->dead = 0;
+
+       if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
+         break;
+
+       if( entry = bt_splitpage (mgr, set, thread_no, 1) )
+         if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
                continue;
 
                continue;
 
-         //  copy key and value onto page and update slot
+       return mgr->err_thread = thread_no, mgr->err;
+  }
+
+  //  copy key and value onto page and update slot
 
 
-         set->page->min -= vallen + sizeof(BtVal);
-         val = (BtVal*)((unsigned char *)set->page + set->page->min);
-         memcpy (val->value, value, vallen);
-         val->len = vallen;
+  set->page->min -= vallen + sizeof(BtVal);
+  val = (BtVal*)((unsigned char *)set->page + set->page->min);
+  memcpy (val->value, value, vallen);
+  val->len = vallen;
 
 
-         set->latch->dirty = 1;
-         set->page->min -= keylen + sizeof(BtKey);
-         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
-         memcpy (ptr->key, key, keylen);
-         ptr->len = keylen;
+  set->latch->dirty = 1;
+  set->page->min -= keylen + sizeof(BtKey);
+  ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+  memcpy (ptr->key, key, keylen);
+  ptr->len = keylen;
        
        
-         node->off = set->page->min;
-         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
-         bt_unpinlatch (set->latch, thread_no, __LINE__);
-         return 0;
-    }
-  }
+  slotptr(set->page,slot)->off = set->page->min;
+
+insxit:
+  bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
+  bt_unpinlatch (set->latch, thread_no, __LINE__);
   return 0;
 }
 
   return 0;
 }
 
@@ -2876,7 +2884,7 @@ uint entry, slot;
 
        //  split page
 
 
        //  split page
 
-       if( entry = bt_splitpage (mgr, set, thread_no) )
+       if( entry = bt_splitpage (mgr, set, thread_no, 0) )
          latch = mgr->leafsets + entry;
        else
          return mgr->err_thread = thread_no, mgr->err;
          latch = mgr->leafsets + entry;
        else
          return mgr->err_thread = thread_no, mgr->err;
@@ -2884,7 +2892,6 @@ uint entry, slot;
        //      splice right page into split chain
        //      and WriteLock it
 
        //      splice right page into split chain
        //      and WriteLock it
 
-//fprintf(stderr, "SplitPg %d by %d at %d\n", (uint)latch->page_no, thread_no, __LINE__); 
        bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
        latch->split = set->latch->split;
        set->latch->split = entry;
        bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
        latch->split = set->latch->split;
        set->latch->split = entry;
@@ -3001,7 +3008,7 @@ int type;
   // promote page into larger btree
 
   if( bt->main )
   // promote page into larger btree
 
   if( bt->main )
-   while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->thread_no )
+   while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
        if( bt_txnpromote (bt) )
          return bt->mgr->err;
 
        if( bt_txnpromote (bt) )
          return bt->mgr->err;
 
@@ -3014,19 +3021,16 @@ int type;
 
 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
 {
 
 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
 {
-unsigned char fencekey[BT_keyarray], prvfence[BT_keyarray];
 uint src, idx, samepage, entry;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtLatchSet *latch;
 uid right_page_no;
 AtomicTxn *locks;
 uint src, idx, samepage, entry;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtLatchSet *latch;
 uid right_page_no;
 AtomicTxn *locks;
-BtKey *key, *ptr, *prv, *cur;
+BtKey *key, *ptr;
 BtPage page;
 BtVal *val;
 
 BtPage page;
 BtVal *val;
 
-uint slot, prvslot;
-
   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
 
   // Load the leaf page for each key
   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
 
   // Load the leaf page for each key
@@ -3043,26 +3047,12 @@ uint slot, prvslot;
          samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
 
        if( !samepage )
          samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
 
        if( !samepage )
-         if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
-//memcpy (prvfence, fencekey, 10),
-//src>1 ?  memcpy (fencekey, ptr->key, 10):NULL,
-//prv = ptr,
-//cur = keyptr(set->page, slot),
+         if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
                ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
          else
                return mgr->err;
 
        entry = set->latch - mgr->leafsets;
                ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
          else
                return mgr->err;
 
        entry = set->latch - mgr->leafsets;
-
-       //  is this actually on the same page?
-
-//if( !samepage )
-//     for( idx = src; --idx; )
-//if( entry == locks[idx].entry ) {
-//fprintf(stderr, "Dup page %d by thread %d\n", (uint)set->latch->page_no, thread_no);
-//             abort();
-//       }
-
        locks[src].reuse = samepage;
        locks[src].entry = entry;
 
        locks[src].reuse = samepage;
        locks[src].entry = entry;
 
@@ -3148,7 +3138,7 @@ uint slot, prvslot;
 
          ptr = keyptr(prev->page,prev->page->cnt);
          bt_putid (value, prev->latch->page_no);
 
          ptr = keyptr(prev->page,prev->page->cnt);
          bt_putid (value, prev->latch->page_no);
-//fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
+
          if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
                return mgr->err;
 
          if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
                return mgr->err;
 
@@ -3193,8 +3183,7 @@ uint slot, prvslot;
          ptr = keyptr(prev->page,prev->page->cnt);
          bt_putid (value, prev->latch->page_no);
 
          ptr = keyptr(prev->page,prev->page->cnt);
          bt_putid (value, prev->latch->page_no);
 
-//fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
-         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
                return mgr->err;
 
          if( lsm )
                return mgr->err;
 
          if( lsm )
@@ -3228,7 +3217,7 @@ uint slot, prvslot;
        // any and all splits were reversed, and the
        // master page located in prev is empty, delete it
 
        // any and all splits were reversed, and the
        // master page located in prev is empty, delete it
 
-       if( bt_deletepage (mgr, prev, thread_no) )
+       if( bt_deletepage (mgr, prev, thread_no, 0) )
                return mgr->err;
   }
 
                return mgr->err;
   }
 
@@ -3259,6 +3248,11 @@ BtVal *val;
 
        set->latch = bt->mgr->leafsets + entry;
 
 
        set->latch = bt->mgr->leafsets + entry;
 
+       //  skip this entry if it has never been used
+
+       if( !set->latch->page_no )
+         continue;
+
        if( !bt_mutextry(set->latch->modify) )
                continue;
 
        if( !bt_mutextry(set->latch->modify) )
                continue;
 
@@ -3271,9 +3265,9 @@ BtVal *val;
 
        set->page = bt_mappage (bt->mgr, set->latch);
 
 
        set->page = bt_mappage (bt->mgr, set->latch);
 
-       // entry never used or has no left or right sibling
+       // entry has no right sibling
 
 
-       if( !set->latch->page_no || !bt_getid (set->page->right) ) {
+       if( !bt_getid (set->page->right) ) {
          bt_releasemutex(set->latch->modify);
          continue;
        }
          bt_releasemutex(set->latch->modify);
          continue;
        }
@@ -3304,7 +3298,7 @@ fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_n
 
        //  now delete the page
 
 
        //  now delete the page
 
-       if( bt_deletepage (bt->mgr, set, bt->thread_no) )
+       if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
                fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
 
        return bt->mgr->err;
                fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
 
        return bt->mgr->err;
@@ -3494,28 +3488,37 @@ struct timeval tv[1];
 }
 #endif
 
 }
 #endif
 
-void bt_poolaudit (BtMgr *mgr)
+void bt_poolaudit (BtMgr *mgr, char *type)
 {
 {
-BtLatchSet *latch;
-uint entry = 0;
+BtLatchSet *latch, test[1];
+uint entry;
+
+       memset (test, 0, sizeof(test));
+
+       if( memcmp (test, mgr->latchsets, sizeof(test)) )
+               fprintf(stderr, "%s latchset zero overwritten\n", type);
+
+       if( memcmp (test, mgr->leafsets, sizeof(test)) )
+               fprintf(stderr, "%s leafset zero overwritten\n", type);
 
 
-       while( ++entry < mgr->latchtotal ) {
+       for( entry = 0; ++entry < mgr->latchtotal; ) {
                latch = mgr->latchsets + entry;
 
                latch = mgr->latchsets + entry;
 
-               if( *latch->readwr->wrt->value )
-                       fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
+               if( *latch->modify->value )
+                       fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
 
 
-//             if( *latch->access->bits->tid )
-//                     fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
+               if( latch->pin & ~CLOCK_bit )
+                       fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
+       }
 
 
-//             if( *latch->parent->bits->tid )
-//                     fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
+       for( entry = 0; ++entry < mgr->leaftotal; ) {
+               latch = mgr->leafsets + entry;
 
                if( *latch->modify->value )
 
                if( *latch->modify->value )
-                       fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
+                       fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
 
                if( latch->pin & ~CLOCK_bit )
 
                if( latch->pin & ~CLOCK_bit )
-                       fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
+                       fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
        }
 }
 
        }
 }
 
@@ -3893,12 +3896,14 @@ BtKey *ptr;
        for( idx = 0; idx < cnt; idx++ )
                CloseHandle(threads[idx]);
 #endif
        for( idx = 0; idx < cnt; idx++ )
                CloseHandle(threads[idx]);
 #endif
-       bt_poolaudit(mgr);
+       bt_poolaudit(mgr, "cache");
 
        if( main )
 
        if( main )
-               bt_poolaudit(main);
+               bt_poolaudit(main, "main");
 
 
-       fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+       fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+       if( main )
+               fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found);
 
        if( main )
                bt_mgrclose (main);
 
        if( main )
                bt_mgrclose (main);