]> pd.if.org Git - btree/blobdiff - threadskv10.c
Merge branch 'master' of https://github.com/malbrain/Btree-source-code
[btree] / threadskv10.c
index cd307f6e9a24b60340f14737e0ee42d0a675ebb8..194757b25f2aa192b379b01511ef567338dd76ad 100644 (file)
@@ -9,7 +9,7 @@
 //     redo log for failure recovery
 //     and LSM B-trees for write optimization
 
-// 11 OCT 2014
+// 15 OCT 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -113,15 +113,15 @@ typedef enum{
 typedef struct {
   union {
        struct {
-         volatile uint xlock:1;        // one writer has exclusive lock
-         volatile uint wrt:31;         // count of other writers waiting
+         volatile ushort xlock[1];             // one writer has exclusive lock
+         volatile ushort wrt[1];               // count of other writers waiting
        } bits[1];
        uint value[1];
   };
 } BtMutexLatch;
 
 #define XCL 1
-#define WRT 2
+#define WRT 65536
 
 //     definition for phase-fair reader/writer lock implementation
 
@@ -130,16 +130,20 @@ typedef struct {
        volatile ushort rout[1];
        volatile ushort ticket[1];
        volatile ushort serving[1];
-       volatile ushort tid[1];
-       volatile ushort dup[1];
 } RWLock;
 
-//     write only lock
+//     write only reentrant lock
 
 typedef struct {
-       BtMutexLatch xcl[1];
-       volatile ushort tid[1];
-       volatile ushort dup[1];
+  BtMutexLatch xcl[1];
+  union {
+       struct {
+               volatile ushort tid[1];
+               volatile ushort dup[1];
+       } bits[1];
+       uint value[1];
+  };
+  volatile uint waiters[1];
 } WOLock;
 
 #define PHID 0x1
@@ -170,11 +174,11 @@ typedef struct {
        WOLock parent[1];               // Posting of fence key in parent
        WOLock atomic[1];               // Atomic update in progress
        uint split;                             // right split page atomic insert
-       uint entry;                             // entry slot in latch table
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
        ushort pin;                             // number of accessing threads
-       unsigned char dirty;    // page in cache is dirty (atomic set)
+       unsigned char dirty;    // page in cache is dirty (atomic setable)
+       unsigned char promote;  // page in cache is dirty (atomic setable)
        BtMutexLatch modify[1]; // modify entry lite latch
 } BtLatchSet;
 
@@ -240,7 +244,7 @@ typedef struct {
 
 //     note that this structure size
 //     must be a multiple of 8 bytes
-//     in order to place dups correctly.
+//     in order to place PageZero correctly.
 
 typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
@@ -269,9 +273,9 @@ typedef struct {
 
 typedef struct {
        struct BtPage_ alloc[1];                // next page_no in right ptr
-       unsigned long long dups[1];             // global duplicate key uniqueifier
        unsigned char freechain[BtId];  // head of free page_nos chain
-       unsigned long long activepages; // number of active pages pages
+       unsigned long long activepages; // number of active pages
+       uint redopages;                                 // number of redo pages in file
 } BtPageZero;
 
 //     The object structure for Btree access
@@ -299,7 +303,6 @@ typedef struct {
        uint latchhash;                         // number of latch hash table slots
        uint latchvictim;                       // next latch entry to examine
        uint latchpromote;                      // next latch entry to promote
-       uint redopages;                         // size of recovery buff in pages
        uint redolast;                          // last msync size of recovery buff
        uint redoend;                           // eof/end element in recovery buff
        int err;                                        // last error
@@ -322,6 +325,15 @@ typedef struct {
        unsigned char key[BT_keyarray]; // last found complete key
 } BtDb;
 
+//     atomic txn structures
+
+typedef struct {
+       logseqno reqlsn;        // redo log seq no required
+       uint entry;                     // latch table entry number
+       uint slot:31;           // page slot number
+       uint reuse:1;           // reused previous page
+} AtomicTxn;
+
 //     Catastrophic errors
 
 typedef enum {
@@ -365,7 +377,7 @@ typedef struct {
 
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
-extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
@@ -385,7 +397,8 @@ extern void bt_mgrclose (BtMgr *mgr);
 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
 
-//     transaction functions
+//     atomic transaction functions
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
 BTERR bt_txnpromote (BtDb *bt);
 
 //  The page is allocated from low and hi ends.
@@ -457,15 +470,6 @@ int i;
        return id;
 }
 
-uid bt_newdup (BtMgr *mgr)
-{
-#ifdef unix
-       return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
-#else
-       return _InterlockedIncrement64(mgr->pagezero->dups, 1);
-#endif
-}
-
 //     lite weight spin lock Latch Manager
 
 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
@@ -481,18 +485,18 @@ uint slept = 0;
   while( 1 ) {
        *prev->value = __sync_fetch_and_or(latch->value, XCL);
 
-       if( !prev->bits->xlock ) {                      // did we set XCL bit?
+       if( !*prev->bits->xlock ) {                     // did we set XCL?
            if( slept )
                  __sync_fetch_and_sub(latch->value, WRT);
                return;
        }
 
        if( !slept ) {
-               prev->bits->wrt++;
+               *prev->bits->wrt += 1;
                __sync_fetch_and_add(latch->value, WRT);
        }
 
-       sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
+       sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
        slept = 1;
   }
 }
@@ -508,9 +512,9 @@ BtMutexLatch prev[1];
 
        *prev->value = __sync_fetch_and_or(latch->value, XCL);
 
-       //      take write access if exclusive bit is clear
+       //      take write access if exclusive bit was clear
 
-       return !prev->bits->xlock;
+       return !*prev->bits->xlock;
 }
 
 //     clear write mode
@@ -521,32 +525,82 @@ BtMutexLatch prev[1];
 
        *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
 
-       if( prev->bits->wrt )
-         sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+       if( *prev->bits->wrt )
+         sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
 }
 
-//     Write-Only Queue Lock
+//     Write-Only Reentrant Lock
 
 void WriteOLock (WOLock *lock, ushort tid)
 {
-       if( *lock->tid == tid ) {
-               *lock->dup += 1;
+uint prev, waited = 0;
+
+  while( 1 ) {
+       bt_mutexlock(lock->xcl);
+
+       if( waited )
+               *lock->waiters -= 1;
+
+       if( *lock->bits->tid == tid ) {
+               *lock->bits->dup += 1;
+               bt_releasemutex(lock->xcl);
+               return;
+       }
+       if( !*lock->bits->tid ) {
+               *lock->bits->tid = tid;
+               bt_releasemutex(lock->xcl);
                return;
        }
 
-       bt_mutexlock(lock->xcl);
-       *lock->tid = tid;
+       waited = 1;
+       *lock->waiters += 1;
+       prev = *lock->value;
+
+       bt_releasemutex(lock->xcl);
+       
+       sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr );
+  }
 }
 
 void WriteORelease (WOLock *lock)
 {
-       if( *lock->dup ) {
-               *lock->dup -= 1;
-               return;
+  bt_mutexlock(lock->xcl);
+
+  if( *lock->bits->dup ) {
+       *lock->bits->dup -= 1;
+    bt_releasemutex(lock->xcl);
+       return;
+  }
+
+  *lock->bits->tid = 0;
+
+  if( *lock->waiters )
+       sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr );
+  bt_releasemutex(lock->xcl);
+}
+
+//     clear lock of holders and waiters
+
+ClearWOLock (WOLock *lock)
+{
+  while( 1 ) {
+       bt_mutexlock(lock->xcl);
+
+       if( *lock->waiters ) {
+               bt_releasemutex(lock->xcl);
+               sched_yield();
+               continue;
+       }
+
+       if( *lock->bits->tid ) {
+               bt_releasemutex(lock->xcl);
+               sched_yield();
+               continue;
        }
 
-       *lock->tid = 0;
        bt_releasemutex(lock->xcl);
+       return;
+  }
 }
 
 //     Phase-Fair reader/writer lock implementation
@@ -555,10 +609,6 @@ void WriteLock (RWLock *lock, ushort tid)
 {
 ushort w, r, tix;
 
-       if( *lock->tid == tid ) {
-               *lock->dup += 1;
-               return;
-       }
 #ifdef unix
        tix = __sync_fetch_and_add (lock->ticket, 1);
 #else
@@ -585,17 +635,10 @@ ushort w, r, tix;
 #else
                SwitchToThread();
 #endif
-       *lock->tid = tid;
 }
 
 void WriteRelease (RWLock *lock)
 {
-       if( *lock->dup ) {
-               *lock->dup -= 1;
-               return;
-       }
-
-       *lock->tid = 0;
 #ifdef unix
        __sync_fetch_and_and (lock->rin, ~MASK);
 #else
@@ -611,12 +654,6 @@ int ReadTry (RWLock *lock, ushort tid)
 {
 ushort w;
 
-       //  OK if write lock already held by same thread
-
-       if( *lock->tid == tid ) {
-               *lock->dup += 1;
-               return 1;
-       }
 #ifdef unix
        w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
 #else
@@ -639,10 +676,6 @@ void ReadLock (RWLock *lock, ushort tid)
 {
 ushort w;
 
-       if( *lock->tid == tid ) {
-               *lock->dup += 1;
-               return;
-       }
 #ifdef unix
        w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
 #else
@@ -659,11 +692,6 @@ ushort w;
 
 void ReadRelease (RWLock *lock)
 {
-       if( *lock->dup ) {
-               *lock->dup -= 1;
-               return;
-       }
-
 #ifdef unix
        __sync_fetch_and_add (lock->rout, RINC);
 #else
@@ -676,9 +704,9 @@ void ReadRelease (RWLock *lock)
 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
 {
 uint cnt3 = 0, cnt2 = 0, cnt = 0;
+uint entry, segment;
 BtLatchSet *latch;
 BtPage page;
-uint entry;
 
   //   flush dirty pool pages to the btree
 
@@ -690,7 +718,7 @@ fprintf(stderr, "Start flushlsn  ");
        bt_lockpage(BtLockRead, latch, thread_no);
 
        if( latch->dirty ) {
-         bt_writepage(mgr, page, latch->page_no, 0);
+         bt_writepage(mgr, page, latch->page_no);
          latch->dirty = 0, cnt++;
        }
 if( latch->pin & ~CLOCK_bit )
@@ -700,7 +728,9 @@ cnt2++;
   }
 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
 fprintf(stderr, "begin sync");
-       sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
+       for( segment = 0; segment < mgr->segments; segment++ )
+         if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
 fprintf(stderr, " end sync\n");
 }
 
@@ -714,8 +744,6 @@ uint offset = 0;
 BtKey *key;
 BtVal *val;
 
-  pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
-
   hdr = (BtLogHdr *)mgr->redobuff;
   mgr->flushlsn = hdr->lsn;
 
@@ -738,11 +766,11 @@ BtVal *val;
 }
 
 //     recovery manager -- append new entry to recovery log
-//     flush to disk when it overflows.
+//     flush dirty pages to disk when it overflows.
 
 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
 {
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
 uint amt = sizeof(BtLogHdr);
 BtLogHdr *hdr, *eof;
 uint last, end;
@@ -757,7 +785,8 @@ uint last, end;
 
        if( amt > size - mgr->redoend ) {
          mgr->flushlsn = mgr->lsn;
-         msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+         if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
          mgr->redolast = 0;
          mgr->redoend = 0;
          bt_flushlsn(mgr, thread_no);
@@ -788,22 +817,25 @@ uint last, end;
        memset (eof, 0, sizeof(BtLogHdr));
        eof->lsn = mgr->lsn;
 
-       last = mgr->redolast & 0xfff;
+       last = mgr->redolast & ~0xfff;
        end = mgr->redoend;
-       mgr->redolast = end;
 
-       bt_releasemutex(mgr->redo);
+       if( end - last + sizeof(BtLogHdr) >= 32768 )
+         if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+         else
+               mgr->redolast = end;
 
-       msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+       bt_releasemutex(mgr->redo);
        return hdr->lsn;
 }
 
 //     recovery manager -- append transaction to recovery log
-//     flush to disk when it overflows.
+//     flush dirty pages to disk when it overflows.
 
 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
 {
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
 uint amt = 0, src, type;
 BtLogHdr *hdr, *eof;
 uint last, end;
@@ -827,7 +859,8 @@ BtVal *val;
 
        if( amt > size - mgr->redoend ) {
          mgr->flushlsn = mgr->lsn;
-         msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+         if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
          mgr->redolast = 0;
          mgr->redoend = 0;
          bt_flushlsn (mgr, thread_no);
@@ -876,30 +909,52 @@ BtVal *val;
        memset (eof, 0, sizeof(BtLogHdr));
        eof->lsn = lsn;
 
-       last = mgr->redolast & 0xfff;
+       last = mgr->redolast & ~0xfff;
        end = mgr->redoend;
-       mgr->redolast = end;
-       bt_releasemutex(mgr->redo);
 
-       msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+       if( end - last + sizeof(BtLogHdr) >= 32768 )
+         if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+         else
+               mgr->redolast = end;
+
        bt_releasemutex(mgr->redo);
        return lsn;
 }
 
+//     sync a single btree page to disk
+
+BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
+{
+uint segment = latch->page_no >> 16;
+BtPage perm;
+
+       if( bt_writepage (mgr, page, latch->page_no) )
+               return mgr->err;
+
+       perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
+
+       if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
+         fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
+       latch->dirty = 0;
+       return 0;
+}
+
 //     read page into buffer pool from permanent location in Btree file
 
 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
 {
 int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
 
   while( 1 ) {
        if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
-               memcpy (page, perm, mgr->page_size);
-if( page->page_no != page_no )
+         perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( perm->page_no != page_no )
 abort();
+               memcpy (page, perm, mgr->page_size);
                mgr->reads++;
                return 0;
        }
@@ -921,18 +976,18 @@ abort();
 //     write page to permanent location in Btree file
 //     clear the dirty bit
 
-BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
+BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
 {
 int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
 
   while( 1 ) {
        if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+         perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( page_no > LEAF_page && perm->page_no != page_no)
+abort();
                memcpy (perm, page, mgr->page_size);
-               if( syncit )
-                       msync (perm, mgr->page_size, MS_SYNC);
                mgr->writes++;
                return 0;
        }
@@ -966,7 +1021,8 @@ void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
 
 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
 {
-BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
+uid entry = latch - mgr->latchsets;
+BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
 
   return page;
 }
@@ -1082,7 +1138,7 @@ trynext:
   //   no read-lock is required since page is not pinned.
 
   if( latch->dirty )
-       if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
+       if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
          return mgr->line = __LINE__, NULL;
        else
          latch->dirty = 0;
@@ -1091,7 +1147,7 @@ trynext:
        memcpy (page, contents, mgr->page_size);
        latch->dirty = 1;
   } else if( bt_readpage (mgr, page, page_no) )
-         return mgr->line = __LINE__, NULL;
+       return mgr->line = __LINE__, NULL;
 
   //  link page as head of hash table chain
   //  if this is a never before used entry,
@@ -1112,7 +1168,6 @@ trynext:
 
   latch->pin = CLOCK_bit | 1;
   latch->page_no = page_no;
-  latch->entry = entry;
   latch->split = 0;
 
   bt_releasemutex (latch->modify);
@@ -1136,8 +1191,6 @@ uint slot;
        if( mgr->redoend ) {
                eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
                memset (eof, 0, sizeof(BtLogHdr));
-
-               pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
        }
 
        //      write remaining dirty pool pages to the btree
@@ -1147,24 +1200,19 @@ uint slot;
                latch = mgr->latchsets + slot;
 
                if( latch->dirty ) {
-                       bt_writepage(mgr, page, latch->page_no, 0);
+                       bt_writepage(mgr, page, latch->page_no);
                        latch->dirty = 0, num++;
                }
        }
 
-       //      flush last batch to disk and clear
-       //      redo recovery buffer on disk.
-
-       fdatasync (mgr->idx);
+       //      clear redo recovery buffer on disk.
 
-       if( mgr->redopages ) {
+       if( mgr->pagezero->redopages ) {
                eof = (BtLogHdr *)mgr->redobuff;
                memset (eof, 0, sizeof(BtLogHdr));
                eof->lsn = mgr->lsn;
-
-               pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
-
-               sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+               if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
+                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
        }
 
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
@@ -1183,7 +1231,6 @@ uint slot;
        CloseHandle(mgr->hpool);
 #endif
 #ifdef unix
-       free (mgr->redobuff);
        close (mgr->idx);
        free (mgr);
 #else
@@ -1303,17 +1350,18 @@ BtVal *val;
        memset (pagezero, 0, 1 << bits);
        pagezero->alloc->lvl = MIN_lvl - 1;
        pagezero->alloc->bits = mgr->page_bits;
-       bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
+       pagezero->redopages = redopages;
+
+       bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
        pagezero->activepages = 2;
 
        //  initialize left-most LEAF page in
        //      alloc->left and count of active leaf pages.
 
        bt_putid (pagezero->alloc->left, LEAF_page);
+       ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
 
-       ftruncate (mgr->idx, REDO_page << mgr->page_bits);
-
-       if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
+       if( bt_writepage (mgr, pagezero->alloc, 0) ) {
                fprintf (stderr, "Unable to create btree page zero\n");
                return bt_mgrclose (mgr), NULL;
        }
@@ -1340,7 +1388,7 @@ BtVal *val;
                pagezero->alloc->act = 1;
                pagezero->alloc->page_no = MIN_lvl - lvl;
 
-               if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
+               if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
                        fprintf (stderr, "Unable to create btree page\n");
                        return bt_mgrclose (mgr), NULL;
                }
@@ -1353,25 +1401,28 @@ mgrlatch:
        VirtualFree (pagezero, 0, MEM_RELEASE);
 #endif
 #ifdef unix
-       // mlock the pagezero page
+       // mlock the first segment of 64K pages
 
        flag = PROT_READ | PROT_WRITE;
-       mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
-       if( mgr->pagezero == MAP_FAILED ) {
-               fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
+       mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
+       mgr->segments = 1;
+
+       if( mgr->pages[0] == MAP_FAILED ) {
+               fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
+
+       mgr->pagezero = (BtPageZero *)mgr->pages[0];
        mlock (mgr->pagezero, mgr->page_size);
 
+       mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
+       mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
+
        mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
        if( mgr->pagepool == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
-       if( mgr->redopages = redopages ) {
-               ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
-               mgr->redobuff = valloc (redopages * mgr->page_size);
-       }
 #else
        flag = PAGE_READWRITE;
        mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
@@ -1401,8 +1452,6 @@ mgrlatch:
                fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
                return bt_mgrclose (mgr), NULL;
        }
-       if( mgr->redopages = redopages )
-               mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
 #endif
 
        mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
@@ -1484,6 +1533,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no)
                WriteOLock (latch->atomic, thread_no);
                ReadLock (latch->readwr, thread_no);
                break;
+       case BtLockAtomic | BtLockWrite:
+               WriteOLock (latch->atomic, thread_no);
+               WriteLock (latch->readwr, thread_no);
+               break;
        }
 }
 
@@ -1514,6 +1567,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch)
                WriteORelease (latch->atomic);
                ReadRelease (latch->readwr);
                break;
+       case BtLockAtomic | BtLockWrite:
+               WriteORelease (latch->atomic);
+               WriteRelease (latch->readwr);
+               break;
        }
 }
 
@@ -1530,7 +1587,7 @@ int blk;
        bt_mutexlock(mgr->lock);
 
        // use empty chain first
-       // else allocate empty page
+       // else allocate new page
 
        if( page_no = bt_getid(mgr->pagezero->freechain) ) {
                if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
@@ -1538,13 +1595,24 @@ int blk;
                else
                        return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
-               bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
                mgr->pagezero->activepages++;
+               bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
 
-               bt_releasemutex(mgr->lock);
+               //  the page is currently free and this
+               //  will keep bt_txnpromote out.
 
-               memcpy (set->page, contents, mgr->page_size);
+               //      contents will replace this bit
+               //  and pin will keep bt_txnpromote out
+
+               contents->page_no = page_no;
                set->latch->dirty = 1;
+
+               memcpy (set->page, contents, mgr->page_size);
+
+               if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
+               bt_releasemutex(mgr->lock);
                return 0;
        }
 
@@ -1555,11 +1623,16 @@ int blk;
        //      extend file into new page.
 
        mgr->pagezero->activepages++;
-       contents->page_no = page_no;
-
-       pwrite (mgr->idx, contents, mgr->page_size, (uid)(page_no + 1) << mgr->page_bits);
+       if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+         fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
        bt_releasemutex(mgr->lock);
 
+       //      keep bt_txnpromote out of this page
+
+       contents->free = 1;
+       contents->page_no = page_no;
+       pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
+
        //      don't load cache from btree page, load it from contents
 
        if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
@@ -1567,6 +1640,9 @@ int blk;
        else
                return mgr->err;
 
+       // now pin will keep bt_txnpromote out
+
+       set->page->free = 0;
        return 0;
 }
 
@@ -1608,10 +1684,11 @@ uint good = 0;
 
 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
 {
-uid page_no = ROOT_page, prevpage = 0;
+uid page_no = ROOT_page, prevpage_no = 0;
 uint drill = 0xff, slot;
 BtLatchSet *prevlatch;
 uint mode, prevmode;
+BtPage prevpage;
 BtVal *val;
 
   //  start at root of btree and drill down
@@ -1629,13 +1706,15 @@ BtVal *val;
          bt_lockpage(BtLockAccess, set->latch, thread_no);
 
        set->page = bt_mappage (mgr, set->latch);
+if( set->latch->promote )
+abort();
 
        //      release & unpin parent or left sibling page
 
-       if( prevpage ) {
+       if( prevpage_no ) {
          bt_unlockpage(prevmode, prevlatch);
          bt_unpinlatch (mgr, prevlatch);
-         prevpage = 0;
+         prevpage_no = 0;
        }
 
        // obtain mode lock using lock chaining through AccessLock
@@ -1663,8 +1742,9 @@ BtVal *val;
                }
        }
 
-       prevpage = set->latch->page_no;
+       prevpage_no = set->latch->page_no;
        prevlatch = set->latch;
+       prevpage = set->page;
        prevmode = mode;
 
        //  find key on page at this level
@@ -1707,6 +1787,7 @@ BtVal *val;
 
 //     return page to free list
 //     page must be delete & write locked
+//     and have no keys pointing to it.
 
 void bt_freepage (BtMgr *mgr, BtPageSet *set)
 {
@@ -1718,20 +1799,24 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set)
 
        memcpy(set->page->right, mgr->pagezero->freechain, BtId);
        bt_putid(mgr->pagezero->freechain, set->latch->page_no);
+       set->latch->promote = 0;
        set->latch->dirty = 1;
        set->page->free = 1;
 
        // decrement active page count
-       // and unlock allocation page
 
        mgr->pagezero->activepages--;
-       bt_releasemutex (mgr->lock);
+
+       if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+         fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
 
        // unlock released page
+       // and unlock allocation page
 
        bt_unlockpage (BtLockDelete, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
        bt_unpinlatch (mgr, set->latch);
+       bt_releasemutex (mgr->lock);
 }
 
 //     a fence key was deleted from a page
@@ -1826,7 +1911,10 @@ uint idx;
 //  delete a page and manage keys
 //  call with page writelocked
 
-BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
+//     returns with page removed
+//     from the page pool.
+
+BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey)
 {
 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
 unsigned char value[BtId];
@@ -1863,6 +1951,7 @@ BtKey *ptr;
        // pull contents of right peer into our empty page
 
        memcpy (set->page, right->page, mgr->page_size);
+       set->page->page_no = set->latch->page_no;
        set->latch->dirty = 1;
 
        // mark right page deleted and point it to left page
@@ -1891,7 +1980,8 @@ BtKey *ptr;
 
        ptr = (BtKey*)lowerfence;
 
-       if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
+       if( delkey )
+        if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
          return mgr->err;
 
        //      obtain delete and write locks to right node
@@ -1902,6 +1992,7 @@ BtKey *ptr;
        bt_freepage (mgr, right);
 
        bt_unlockpage (BtLockParent, set->latch);
+       bt_unpinlatch (mgr, set->latch);
        return 0;
 }
 
@@ -1956,9 +2047,12 @@ BtVal *val;
                        break;
          }
 
+       if( !found )
+               return 0;
+
        //      did we delete a fence key in an upper level?
 
-       if( found && lvl && set->page->act && fence )
+       if( lvl && set->page->act && fence )
          if( bt_fixfence (mgr, set, lvl, thread_no) )
                return mgr->err;
          else
@@ -1966,7 +2060,7 @@ BtVal *val;
 
        //      do we need to collapse root?
 
-       if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
+       if( set->latch->page_no == ROOT_page && set->page->act == 1 )
          if( bt_collapseroot (mgr, set, thread_no) )
                return mgr->err;
          else
@@ -1974,14 +2068,11 @@ BtVal *val;
 
        //      delete empty page
 
-       if( !set->page->act ) {
-         if( bt_deletepage (mgr, set, thread_no) )
-               return mgr->err;
-       } else {
-         set->latch->dirty = 1;
-         bt_unlockpage(BtLockWrite, set->latch);
-       }
+       if( !set->page->act )
+         return bt_deletepage (mgr, set, thread_no, 1);
 
+       set->latch->dirty = 1;
+       bt_unlockpage(BtLockWrite, set->latch);
        bt_unpinlatch (mgr, set->latch);
        return 0;
 }
@@ -2172,9 +2263,13 @@ uint nxt = mgr->page_size;
 unsigned char value[BtId];
 BtPageSet left[1];
 uid left_page_no;
+BtPage frame;
 BtKey *ptr;
 BtVal *val;
 
+       frame = malloc (mgr->page_size);
+       memcpy (frame, root->page, mgr->page_size);
+
        //      save left page fence key for new root
 
        ptr = keyptr(root->page, root->page->cnt);
@@ -2183,11 +2278,12 @@ BtVal *val;
        //  Obtain an empty page to use, and copy the current
        //  root contents into it, e.g. lower keys
 
-       if( bt_newpage(mgr, left, root->page, page_no) )
+       if( bt_newpage(mgr, left, frame, page_no) )
                return mgr->err;
 
        left_page_no = left->latch->page_no;
        bt_unpinlatch (mgr, left->latch);
+       free (frame);
 
        // preserve the page info at the bottom
        // of higher keys and set rest to zero
@@ -2243,7 +2339,7 @@ BtVal *val;
 //  split already locked full node
 //     leave it locked.
 //     return pool entry for new right
-//     page, unlocked
+//     page, pinned & unlocked
 
 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
 {
@@ -2352,13 +2448,14 @@ uint prev;
        bt_putid(set->page->right, right->latch->page_no);
        set->page->min = nxt;
        set->page->cnt = idx;
+       free(frame);
 
-       return right->latch->entry;
+       return right->latch - mgr->latchsets;
 }
 
 //     fix keys for newly split page
-//     call with page locked, return
-//     unlocked
+//     call with both pages pinned & locked
+//  return unlocked and unpinned
 
 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
 {
@@ -2422,6 +2519,7 @@ uint idx, librarian;
 BtSlot *node;
 BtKey *ptr;
 BtVal *val;
+int rate;
 
        //      if found slot > desired slot and previous slot
        //      is a librarian slot, use it
@@ -2450,27 +2548,52 @@ BtVal *val;
          if( slotptr(set->page, idx)->dead )
                break;
 
-       // now insert key into array before slot
+       // now insert key into array before slot,
+       //      adding as many librarian slots as
+       //      makes sense.
 
-       if( idx == set->page->cnt )
-               idx += 2, set->page->cnt += 2, librarian = 2;
-       else
-               librarian = 1;
+       if( idx == set->page->cnt ) {
+       int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
 
-       set->latch->dirty = 1;
-       set->page->act++;
+               librarian = ++idx - slot;
+               avail /= sizeof(BtSlot);
 
-       while( idx > slot + librarian - 1 )
-               *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
+               if( avail < 0 )
+                       avail = 0;
 
-       //      add librarian slot
+               if( librarian > avail )
+                       librarian = avail;
 
-       if( librarian > 1 ) {
-               node = slotptr(set->page, slot++);
-               node->off = set->page->min;
-               node->type = Librarian;
-               node->dead = 1;
+               if( librarian ) {
+                       rate = (idx - slot) / librarian;
+                       set->page->cnt += librarian;
+                       idx += librarian;
+               } else
+                       rate = 0;
+       } else
+               librarian = 0, rate = 0;
+
+       while( idx > slot ) {
+               //  transfer slot
+               *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
+               idx--;
+
+               //      add librarian slot per rate
+
+               if( librarian )
+                if( (idx - slot + 1)/2 <= librarian * rate ) {
+//               if( rate && !(idx % rate) ) {
+                       node = slotptr(set->page, idx--);
+                       node->off = node[1].off;
+                       node->type = Librarian;
+                       node->dead = 1;
+                       librarian--;
+                 }
        }
+if(librarian)
+abort();
+       set->latch->dirty = 1;
+       set->page->act++;
 
        //      fill in new slot
 
@@ -2594,23 +2717,6 @@ BtVal *val;
   return 0;
 }
 
-typedef struct {
-       logseqno reqlsn;        // redo log seq no required
-       logseqno lsn;           // redo log sequence number
-       uint entry;                     // latch table entry number
-       uint slot:31;           // page slot number
-       uint reuse:1;           // reused previous page
-} AtomicTxn;
-
-typedef struct {
-       uid page_no;            // page number for split leaf
-       void *next;                     // next key to insert
-       uint entry:29;          // latch table entry number
-       uint type:2;            // 0 == insert, 1 == delete, 2 == free
-       uint nounlock:1;        // don't unlock ParentModification
-       unsigned char leafkey[BT_keyarray];
-} AtomicKey;
-
 //     determine actual page where key is located
 //  return slot number
 
@@ -2653,7 +2759,7 @@ uint entry;
        return 0;
 }
 
-BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
 {
 BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
@@ -2665,17 +2771,19 @@ uint entry, slot;
        if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
          if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
                return mgr->err;
-         set->page->lsn = locks[src].lsn;
+         set->page->lsn = lsn;
          return 0;
        }
 
+       //  split page
+
        if( entry = bt_splitpage (mgr, set, thread_no) )
          latch = mgr->latchsets + entry;
        else
          return mgr->err;
 
        //      splice right page into split chain
-       //      and WriteLock it.
+       //      and WriteLock it
 
        bt_lockpage(BtLockWrite, latch, thread_no);
        latch->split = set->latch->split;
@@ -2689,7 +2797,7 @@ uint entry, slot;
 //     perform delete from smaller btree
 //  insert a delete slot if not found there
 
-BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
 {
 BtKey *key = keyptr(source, src);
 BtPageSet set[1];
@@ -2718,131 +2826,35 @@ BtVal *val;
 
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
        set->latch->dirty = 1;
-       set->page->lsn = locks[src].lsn;
+       set->page->lsn = lsn;
        set->page->act--;
 
        node->dead = 0;
-       mgr->found++;
+       __sync_fetch_and_add(&mgr->found, 1);
        return 0;
 }
 
-//     delete an empty master page for a transaction
-
-//     note that the far right page never empties because
-//     it always contains (at least) the infinite stopper key
-//     and that all pages that don't contain any keys are
-//     deleted, or are being held under Atomic lock.
-
-BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
+int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
 {
-BtPageSet right[1], temp[1];
-unsigned char value[BtId];
-uid right_page_no;
-BtKey *ptr;
-
-       bt_lockpage(BtLockWrite, prev->latch, thread_no);
-
-       //      grab the right sibling
-
-       if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
-               right->page = bt_mappage (mgr, right->latch);
-       else
-               return mgr->err;
-
-       bt_lockpage(BtLockAtomic, right->latch, thread_no);
-       bt_lockpage(BtLockWrite, right->latch, thread_no);
+BtKey *key1 = (BtKey *)((char *)page + slot1->off);
+BtKey *key2 = (BtKey *)((char *)page + slot2->off);
 
-       //      and pull contents over empty page
-       //      while preserving master's left link
-
-       memcpy (right->page->left, prev->page->left, BtId);
-       memcpy (prev->page, right->page, mgr->page_size);
-
-       //      forward seekers to old right sibling
-       //      to new page location in set
-
-       bt_putid (right->page->right, prev->latch->page_no);
-       right->latch->dirty = 1;
-       right->page->kill = 1;
-
-       //      remove pointer to right page for searchers by
-       //      changing right fence key to point to the master page
-
-       ptr = keyptr(right->page,right->page->cnt);
-       bt_putid (value, prev->latch->page_no);
-
-       if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
-               return mgr->err;
-
-       //  now that master page is in good shape we can
-       //      remove its locks.
-
-       bt_unlockpage (BtLockAtomic, prev->latch);
-       bt_unlockpage (BtLockWrite, prev->latch);
-
-       //  fix master's right sibling's left pointer
-       //      to remove scanner's poiner to the right page
-
-       if( right_page_no = bt_getid (prev->page->right) ) {
-         if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
-               temp->page = bt_mappage (mgr, temp->latch);
-         else
-               return mgr->err;
-
-         bt_lockpage (BtLockWrite, temp->latch, thread_no);
-         bt_putid (temp->page->left, prev->latch->page_no);
-         temp->latch->dirty = 1;
-
-         bt_unlockpage (BtLockWrite, temp->latch);
-         bt_unpinlatch (mgr, temp->latch);
-       } else {        // master is now the far right page
-         bt_mutexlock (mgr->lock);
-         bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
-         bt_releasemutex(mgr->lock);
-       }
-
-       //      now that there are no pointers to the right page
-       //      we can delete it after the last read access occurs
-
-       bt_unlockpage (BtLockWrite, right->latch);
-       bt_unlockpage (BtLockAtomic, right->latch);
-       bt_lockpage (BtLockDelete, right->latch, thread_no);
-       bt_lockpage (BtLockWrite, right->latch, thread_no);
-       bt_freepage (mgr, right);
-       return 0;
+       return keycmp (key1, key2->key, key2->len);
 }
-
 //     atomic modification of a batch of keys.
 
-//     return -1 if BTERR is set
-//     otherwise return slot number
-//     causing the key constraint violation
-//     or zero on successful completion.
-
 BTERR bt_atomictxn (BtDb *bt, BtPage source)
 {
 uint src, idx, slot, samepage, entry, que = 0;
-AtomicKey *head, *tail, *leaf;
-BtPageSet set[1], prev[1];
-unsigned char value[BtId];
 BtKey *key, *ptr, *key2;
-BtLatchSet *latch;
-AtomicTxn *locks;
 int result = 0;
 BtSlot temp[1];
 logseqno lsn;
-BtPage page;
-BtVal *val;
-uid right;
 int type;
 
-  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
-  head = NULL;
-  tail = NULL;
-
   // stable sort the list of keys into order to
   //   prevent deadlocks between threads.
-
+/*
   for( src = 1; src++ < source->cnt; ) {
        *temp = *slotptr(source,src);
        key = keyptr (source,src);
@@ -2856,15 +2868,47 @@ int type;
                break;
        }
   }
-
+*/
+       qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
   //  add entries to redo log
 
-  if( bt->mgr->redopages )
-   if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
-    for( src = 0; src++ < source->cnt; )
-        locks[src].lsn = lsn;
-    else
-        return bt->mgr->err;
+  if( bt->mgr->pagezero->redopages )
+       lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
+  else
+       lsn = 0;
+
+  //  perform the individual actions in the transaction
+
+  if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
+       return bt->mgr->err;
+
+  // if number of active pages
+  // is greater than the buffer pool
+  // promote page into larger btree
+
+  if( bt->main )
+   while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+       if( bt_txnpromote (bt) )
+         return bt->mgr->err;
+
+  // return success
+
+  return 0;
+}
+
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
+{
+uint src, idx, slot, samepage, entry, que = 0;
+BtPageSet set[1], prev[1];
+unsigned char value[BtId];
+BtLatchSet *latch;
+AtomicTxn *locks;
+BtKey *key, *ptr;
+BtPage page;
+BtVal *val;
+uid right;
+
+  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
 
   // Load the leaf page for each key
   // group same page references with reuse bit
@@ -2881,14 +2925,12 @@ int type;
        if( samepage = src > 1 )
          if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
-         else
-               bt_unlockpage(BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
+         if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) )
                set->latch->split = 0;
          else
-               return bt->mgr->err;
+               return mgr->err;
 
        if( slotptr(set->page, slot)->type == Librarian )
          ptr = keyptr(set->page, ++slot);
@@ -2896,7 +2938,7 @@ int type;
          ptr = keyptr(set->page, slot);
 
        if( !samepage ) {
-         locks[src].entry = set->latch->entry;
+         locks[src].entry = set->latch - mgr->latchsets;
          locks[src].slot = slot;
          locks[src].reuse = 0;
        } else {
@@ -2910,29 +2952,18 @@ int type;
        locks[src].reqlsn = set->page->lsn;
   }
 
-  //  unlock last loadpage lock
-
-  if( source->cnt )
-       bt_unlockpage(BtLockRead, set->latch);
-
   //  obtain write lock for each master page
-  //  sync flushed pages to disk
 
   for( src = 0; src++ < source->cnt; ) {
        if( locks[src].reuse )
          continue;
 
-       set->latch = bt->mgr->latchsets + locks[src].entry;
-       bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+       set->latch = mgr->latchsets + locks[src].entry;
+       bt_lockpage (BtLockWrite, set->latch, thread_no);
   }
 
   // insert or delete each key
   // process any splits or merges
-  // release Write & Atomic latches
-  // set ParentModifications and build
-  // queue of keys to insert for split pages
-  // or delete for deleted pages.
-
   // run through txn list backwards
 
   samepage = source->cnt + 1;
@@ -2948,227 +2979,158 @@ int type;
        for( idx = src; idx < samepage; idx++ )
         switch( slotptr(source,idx)->type ) {
         case Delete:
-         if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
-               return bt->mgr->err;
+         if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
+               return mgr->err;
          break;
 
        case Duplicate:
        case Unique:
-         if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
-               return bt->mgr->err;
+         if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
+               return mgr->err;
+         break;
+
+       default:
+         bt_atomicpage (mgr, source, locks, idx, set);
          break;
        }
 
        //      after the same page operations have finished,
        //  process master page for splits or deletion.
 
-       latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
-       prev->page = bt_mappage (bt->mgr, prev->latch);
+       latch = prev->latch = mgr->latchsets + locks[src].entry;
+       prev->page = bt_mappage (mgr, prev->latch);
        samepage = src;
 
        //  pick-up all splits from master page
-       //      each one is already WriteLocked.
-
-       entry = prev->latch->split;
+       //      each one is already pinned & WriteLocked.
 
-       while( entry ) {
-         set->latch = bt->mgr->latchsets + entry;
-         set->page = bt_mappage (bt->mgr, set->latch);
-         entry = set->latch->split;
+       if( entry = prev->latch->split ) do {
+         set->latch = mgr->latchsets + entry;
+         set->page = bt_mappage (mgr, set->latch);
 
          // delete empty master page by undoing its split
          //  (this is potentially another empty page)
-         //  note that there are no new left pointers yet
+         //  note that there are no pointers to it yet
 
          if( !prev->page->act ) {
                memcpy (set->page->left, prev->page->left, BtId);
-               memcpy (prev->page, set->page, bt->mgr->page_size);
-               bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
-               bt_freepage (bt->mgr, set);
-
+               memcpy (prev->page, set->page, mgr->page_size);
+               bt_lockpage (BtLockDelete, set->latch, thread_no);
                prev->latch->split = set->latch->split;
                prev->latch->dirty = 1;
+               bt_freepage (mgr, set);
                continue;
          }
 
-         // remove empty page from the split chain
-         // and return it to the free list.
+         // remove empty split page from the split chain
+         // and return it to the free list. No other
+         // thread has its page number yet.
 
          if( !set->page->act ) {
                memcpy (prev->page->right, set->page->right, BtId);
                prev->latch->split = set->latch->split;
-               bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
-               bt_freepage (bt->mgr, set);
+
+               bt_lockpage (BtLockDelete, set->latch, thread_no);
+               bt_freepage (mgr, set);
                continue;
          }
 
-         //  schedule prev fence key update
+         //  update prev's fence key
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = calloc (sizeof(AtomicKey), 1), que++;
-
-         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-         leaf->page_no = prev->latch->page_no;
-         leaf->entry = prev->latch->entry;
-         leaf->type = 0;
+         bt_putid (value, prev->latch->page_no);
 
-         if( tail )
-               tail->next = leaf;
-         else
-               head = leaf;
-
-         tail = leaf;
+         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+               return mgr->err;
 
          // splice in the left link into the split page
 
          bt_putid (set->page->left, prev->latch->page_no);
-         bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
+
+         if( lsm )
+               bt_syncpage (mgr, prev->page, prev->latch);
+
+         // page is unpinned below to avoid bt_txnpromote
+
          bt_unlockpage(BtLockWrite, prev->latch);
          *prev = *set;
-       }
+       } while( entry = prev->latch->split );
 
        //  update left pointer in next right page from last split page
-       //      (if all splits were reversed, latch->split == 0)
+       //      (if all splits were reversed or none occurred, latch->split == 0)
 
        if( latch->split ) {
          //  fix left pointer in master's original (now split)
          //  far right sibling or set rightmost page in page zero
 
          if( right = bt_getid (prev->page->right) ) {
-               if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
-                 set->page = bt_mappage (bt->mgr, set->latch);
+               if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
+                 set->page = bt_mappage (mgr, set->latch);
                else
-                 return bt->mgr->err;
+                 return mgr->err;
 
-           bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+           bt_lockpage (BtLockWrite, set->latch, thread_no);
            bt_putid (set->page->left, prev->latch->page_no);
                set->latch->dirty = 1;
+
            bt_unlockpage (BtLockWrite, set->latch);
-               bt_unpinlatch (bt->mgr, set->latch);
+               bt_unpinlatch (mgr, set->latch);
          } else {      // prev is rightmost page
-           bt_mutexlock (bt->mgr->lock);
-               bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
-           bt_releasemutex(bt->mgr->lock);
+           bt_mutexlock (mgr->lock);
+               bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
+           bt_releasemutex(mgr->lock);
          }
 
          //  Process last page split in chain
+         //  by switching the key from the master
+         //  page to the last split.
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = calloc (sizeof(AtomicKey), 1), que++;
+         bt_putid (value, prev->latch->page_no);
 
-         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-         leaf->page_no = prev->latch->page_no;
-         leaf->entry = prev->latch->entry;
-         leaf->type = 0;
-  
-         if( tail )
-               tail->next = leaf;
-         else
-               head = leaf;
-
-         tail = leaf;
+         if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+               return mgr->err;
 
-         bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
          bt_unlockpage(BtLockWrite, prev->latch);
 
-         //  remove atomic lock on master page
+         if( lsm )
+               bt_syncpage (mgr, prev->page, prev->latch);
 
          bt_unlockpage(BtLockAtomic, latch);
+         bt_unpinlatch(mgr, latch);
+
+         // go through the list of splits and
+         // release the latch pins
+
+         while( entry = latch->split ) {
+               latch = mgr->latchsets + entry;
+               bt_unpinlatch(mgr, latch);
+         }
+
          continue;
        }
 
-       //  finished if prev page occupied (either master or final split)
+       //      since there are no splits, we're
+       //  finished if master page occupied
 
        if( prev->page->act ) {
-         bt_unlockpage(BtLockWrite, latch);
-         bt_unlockpage(BtLockAtomic, latch);
-         bt_unpinlatch(bt->mgr, latch);
+         bt_unlockpage(BtLockAtomic, prev->latch);
+         bt_unlockpage(BtLockWrite, prev->latch);
+
+         if( lsm )
+               bt_syncpage (mgr, prev->page, prev->latch);
+
+         bt_unpinlatch(mgr, prev->latch);
          continue;
        }
 
        // any and all splits were reversed, and the
        // master page located in prev is empty, delete it
-       // by pulling over master's right sibling.
-
-       // Remove empty master's fence key
-
-       ptr = keyptr(prev->page,prev->page->cnt);
-
-       if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
-               return bt->mgr->err;
-
-       //      perform the remainder of the delete
-       //      from the FIFO queue
 
-       leaf = calloc (sizeof(AtomicKey), 1), que++;
-
-       memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-       leaf->page_no = prev->latch->page_no;
-       leaf->entry = prev->latch->entry;
-       leaf->nounlock = 1;
-       leaf->type = 2;
-  
-       if( tail )
-         tail->next = leaf;
-       else
-         head = leaf;
-
-       tail = leaf;
-
-       //      leave atomic lock in place until
-       //      deletion completes in next phase.
-
-       bt_unlockpage(BtLockWrite, prev->latch);
+       if( bt_deletepage (mgr, prev, thread_no, 1) )
+               return mgr->err;
   }
 
-  //  add & delete keys for any pages split or merged during transaction
-
-  if( leaf = head )
-    do {
-         set->latch = bt->mgr->latchsets + leaf->entry;
-         set->page = bt_mappage (bt->mgr, set->latch);
-
-         bt_putid (value, leaf->page_no);
-         ptr = (BtKey *)leaf->leafkey;
-
-         switch( leaf->type ) {
-         case 0:       // insert key
-           if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
-                 return bt->mgr->err;
-
-               break;
-
-         case 1:       // delete key
-               if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
-                 return bt->mgr->err;
-
-               break;
-
-         case 2:       // free page
-               if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
-                 return bt->mgr->err;
-
-               break;
-         }
-
-         if( !leaf->nounlock )
-           bt_unlockpage (BtLockParent, set->latch);
-
-         bt_unpinlatch (bt->mgr, set->latch);
-         tail = leaf->next;
-         free (leaf);
-       } while( leaf = tail );
-
-  // if number of active pages
-  // is greater than the buffer pool
-  // promote page into larger btree
-
-  while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
-       if( bt_txnpromote (bt) )
-         return bt->mgr->err;
-
-  // return success
-
   free (locks);
   return 0;
 }
@@ -3195,21 +3157,14 @@ BtVal *val;
                continue;
 
        set->latch = bt->mgr->latchsets + entry;
-       idx = set->latch->page_no % bt->mgr->latchhash;
 
        if( !bt_mutextry(set->latch->modify) )
                continue;
 
-//    if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
-//             bt_releasemutex(set->latch->modify);
-//             continue;
-//     }
-
        //  skip this entry if it is pinned
 
        if( set->latch->pin & ~CLOCK_bit ) {
          bt_releasemutex(set->latch->modify);
-//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
          continue;
        }
 
@@ -3219,70 +3174,51 @@ BtVal *val;
 
        if( !set->latch->page_no || !bt_getid (set->page->right) ) {
          bt_releasemutex(set->latch->modify);
-//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
          continue;
        }
 
-       bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
-       bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
-    bt_unlockpage(BtLockAccess, set->latch);
-
-       // entry interiour node or being or was killed
+       // entry interiour node or being killed or promoted
 
        if( set->page->lvl || set->page->free || set->page->kill ) {
          bt_releasemutex(set->latch->modify);
-//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
-      bt_unlockpage(BtLockWrite, set->latch);
          continue;
        }
 
        //  pin the page for our useage
 
        set->latch->pin++;
+       set->latch->promote = 1;
        bt_releasemutex(set->latch->modify);
-//    bt_releasemutex(bt->mgr->hashtable[idx].latch);
-
-       //      if page is dirty, then
-       //      sync it to the disk first.
 
-       if( set->latch->dirty )
-        if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
-         return bt->mgr->line = __LINE__, bt->mgr->err;
-        else
-         set->latch->dirty = 0;
+       bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
 
-       // transfer slots in our selected page to larger btree
-if( !(set->latch->page_no % 100) )
-fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
+       //  remove the key for the page
+       //      and wait for other threads to fade away
 
-       for( slot = 0; slot++ < set->page->cnt; ) {
-               ptr = keyptr (set->page, slot);
-               val = valptr (set->page, slot);
-               node = slotptr(set->page, slot);
+       ptr = keyptr(set->page, set->page->cnt);
 
-               switch( node->type ) {
-               case Duplicate:
-               case Unique:
-                 if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
-                       return bt->main->err;
+       if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
+               return bt->mgr->err;
 
-                 continue;
+       bt_unlockpage (BtLockWrite, set->latch);
+while( (set->latch->pin & ~CLOCK_bit) > 1 )
+sched_yield();
+       bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
+       bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
+       bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
 
-               case Delete:
-                 if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
-                       return bt->main->err;
+       // transfer slots in our selected page to larger btree
+if( !(set->latch->page_no % 100) )
+fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
 
-                 continue;
-               }
-       }
+       if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
+               return bt->main->err;
 
        //  now delete the page
 
-       if( bt_deletepage (bt->mgr, set, bt->thread_no) )
-               return bt->mgr->err;
-
-       bt_unpinlatch (bt->mgr, set->latch);
-       return 0;
+       bt_unlockpage (BtLockDelete, set->latch);
+       bt_unlockpage (BtLockAtomic, set->latch);
+       return bt_deletepage (bt->mgr, set, bt->thread_no, 0);
   }
 }
 
@@ -3698,7 +3634,7 @@ FILE *in;
                posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
 #endif
                fprintf(stderr, "started counting\n");
-               next = LEAF_page + bt->mgr->redopages + 1;
+               next = REDO_page + bt->mgr->pagezero->redopages;
 
                while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
                        if( bt_readpage (bt->mgr, bt->cursor, page_no) )
@@ -3716,7 +3652,6 @@ FILE *in;
                break;
        }
 
-       fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
        bt_close (bt);
 #ifdef unix
        return NULL;
@@ -3738,8 +3673,8 @@ pthread_t *threads;
 HANDLE *threads;
 #endif
 ThreadArg *args;
+uint redopages = 0;
 uint poolsize = 0;
-uint recovery = 0;
 uint mainpool = 0;
 uint mainbits = 0;
 float elapsed;
@@ -3779,7 +3714,10 @@ BtKey *ptr;
                num = atoi(argv[6]);
 
        if( argc > 7 )
-               recovery = atoi(argv[7]);
+               redopages = atoi(argv[7]);
+
+       if( redopages + REDO_page > 65535 )
+               fprintf (stderr, "Warning: Recovery buffer too large\n");
 
        if( argc > 8 )
                mainbits = atoi(argv[8]);
@@ -3795,19 +3733,22 @@ BtKey *ptr;
 #endif
        args = malloc ((cnt + 1) * sizeof(ThreadArg));
 
-       mgr = bt_mgr (argv[1], bits, poolsize, recovery);
+       mgr = bt_mgr (argv[1], bits, poolsize, redopages);
 
        if( !mgr ) {
                fprintf(stderr, "Index Open Error %s\n", argv[1]);
                exit (1);
        }
 
-       main = bt_mgr (argv[2], mainbits, mainpool, 0);
+       if( mainbits ) {
+               main = bt_mgr (argv[2], mainbits, mainpool, 0);
 
-       if( !main ) {
-               fprintf(stderr, "Index Open Error %s\n", argv[2]);
-               exit (1);
-       }
+               if( !main ) {
+                       fprintf(stderr, "Index Open Error %s\n", argv[2]);
+                       exit (1);
+               }
+       } else
+               main = NULL;
 
        //      fire off threads
 
@@ -3848,9 +3789,14 @@ BtKey *ptr;
                CloseHandle(threads[idx]);
 #endif
        bt_poolaudit(mgr);
-       bt_poolaudit(main);
 
-       bt_mgrclose (main);
+       if( main )
+               bt_poolaudit(main);
+
+       fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+
+       if( main )
+               bt_mgrclose (main);
        bt_mgrclose (mgr);
 
        elapsed = getCpuTime(0) - start;