]> pd.if.org Git - btree/commitdiff
Progress made on LSM B-tree
authorunknown <karl@E04.petzent.com>
Sun, 12 Oct 2014 15:05:10 +0000 (08:05 -0700)
committerunknown <karl@E04.petzent.com>
Sun, 12 Oct 2014 15:05:10 +0000 (08:05 -0700)
threadskv10.c

index c524a48ae50ac7d3d92cbdc6c399347a9ecf6822..129434fda0f8b8756ca60492ac578a408c3cb71d 100644 (file)
@@ -1,4 +1,4 @@
-// btree version threadskv10 FUTEX version
+// btree version threadskv10 futex version
 //     with reworked bt_deletekey code,
 //     phase-fair reader writer lock,
 //     librarian page split code,
@@ -7,9 +7,9 @@
 //     traditional buffer pool manager
 //     ACID batched key-value updates
 //     redo log for failure recovery
-//     and dual B-trees for write optimization
+//     and LSM B-trees for write optimization
 
-// 09 OCT 2014
+// 11 OCT 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -110,6 +110,41 @@ typedef enum{
        BtLockAtomic = 32
 } BtLock;
 
+//     lite weight spin latch
+
+volatile typedef struct {
+       ushort exclusive:1;
+       ushort pending:1;
+       ushort share:14;
+} BtMutexLatch;
+
+#define XCL 1
+#define PEND 2
+#define BOTH 3
+#define SHARE 4
+
+/*
+// exclusive is set for write access
+
+volatile typedef struct {
+       unsigned char exclusive[1];
+       unsigned char filler;
+} BtMutexLatch;
+*/
+/*
+typedef struct {
+  union {
+       struct {
+         volatile uint xlock:1;        // one writer has exclusive lock
+         volatile uint wrt:31;         // count of other writers waiting
+       } bits[1];
+       uint value[1];
+  };
+} BtMutexLatch;
+*/
+#define XCL 1
+#define WRT 2
+
 //     definition for phase-fair reader/writer lock implementation
 
 typedef struct {
@@ -117,16 +152,16 @@ typedef struct {
        volatile ushort rout[1];
        volatile ushort ticket[1];
        volatile ushort serving[1];
-       ushort tid;
-       ushort dup;
+       volatile ushort tid[1];
+       volatile ushort dup[1];
 } RWLock;
 
 //     write only lock
 
 typedef struct {
-       volatile uint exclusive[1];
-       ushort tid;
-       ushort dup;
+       BtMutexLatch xcl[1];
+       volatile ushort tid[1];
+       volatile ushort dup[1];
 } WOLock;
 
 #define PHID 0x1
@@ -134,16 +169,6 @@ typedef struct {
 #define MASK 0x3
 #define RINC 0x4
 
-//     lite weight mutex
-
-// exclusive is set for write access
-
-typedef struct {
-       volatile uint exclusive[1];
-} BtMutexLatch;
-
-#define XCL 1
-
 //     mode & definition for lite latch implementation
 
 enum {
@@ -172,7 +197,6 @@ typedef struct {
        uint prev;                              // prev entry in hash table chain
        ushort pin;                             // number of accessing threads
        unsigned char dirty;    // page in cache is dirty (atomic set)
-       unsigned char avail;    // page is an available entry
        BtMutexLatch modify[1]; // modify entry lite latch
 } BtLatchSet;
 
@@ -203,8 +227,7 @@ typedef enum {
        Unique,
        Librarian,
        Duplicate,
-       Delete,
-       Update
+       Delete
 } BtSlotType;
 
 typedef struct {
@@ -267,9 +290,10 @@ typedef struct {
 //     structure for latch manager on ALLOC_page
 
 typedef struct {
-       struct BtPage_ alloc[1];        // next page_no in right ptr
-       unsigned long long dups[1];     // global duplicate key uniqueifier
-       unsigned char chain[BtId];      // head of free page_nos chain
+       struct BtPage_ alloc[1];                // next page_no in right ptr
+       unsigned long long dups[1];             // global duplicate key uniqueifier
+       unsigned char freechain[BtId];  // head of free page_nos chain
+       unsigned long long activepages; // number of active pages pages
 } BtPageZero;
 
 //     The object structure for Btree access
@@ -288,18 +312,17 @@ typedef struct {
        unsigned char *pagepool;        // mapped to the buffer pool pages
        unsigned char *redobuff;        // mapped recovery buffer pointer
        logseqno lsn, flushlsn;         // current & first lsn flushed
-       BtMutexLatch dump[1];           // redo dump lite latch
        BtMutexLatch redo[1];           // redo area lite latch
        BtMutexLatch lock[1];           // allocation area lite latch
+       BtMutexLatch maps[1];           // mapping segments lite latch
        ushort thread_no[1];            // next thread number
        uint nlatchpage;                        // number of latch pages at BT_latch
        uint latchtotal;                        // number of page latch entries
        uint latchhash;                         // number of latch hash table slots
        uint latchvictim;                       // next latch entry to examine
-       uint latchavail;                        // next available latch entry
-       uint availlock[1];                      // latch available commitments
-       uint available;                         // number of available latches
+       uint latchpromote;                      // next latch entry to promote
        uint redopages;                         // size of recovery buff in pages
+       uint redolast;                          // last msync size of recovery buff
        uint redoend;                           // eof/end element in recovery buff
        int err;                                        // last error
        int line;                                       // last error line no
@@ -309,6 +332,8 @@ typedef struct {
        HANDLE halloc;                          // allocation handle
        HANDLE hpool;                           // buffer pool handle
 #endif
+       uint segments;                          // number of memory mapped segments
+       unsigned char *pages[64000];// memory mapped segments of b-tree
 } BtMgr;
 
 typedef struct {
@@ -330,8 +355,7 @@ typedef enum {
        BTERR_read,
        BTERR_wrt,
        BTERR_atomic,
-       BTERR_recovery,
-       BTERR_avail
+       BTERR_recovery
 } BTERR;
 
 #define CLOCK_bit 0x8000
@@ -363,11 +387,11 @@ typedef struct {
 
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
-extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
-extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update, ushort thread_no);
+extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
 
 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
@@ -383,6 +407,9 @@ extern void bt_mgrclose (BtMgr *mgr);
 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
 
+//     transaction functions
+BTERR bt_txnpromote (BtDb *bt);
+
 //  The page is allocated from low and hi ends.
 //  The key slots are allocated from the bottom,
 //     while the text and value of the key
@@ -461,47 +488,170 @@ uid bt_newdup (BtMgr *mgr)
 #endif
 }
 
-//     Write-Only Queue Lock
+//     lite weight spin lock Latch Manager
 
-void WriteOLock (WOLock *lock, ushort tid)
+int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
 {
-uint prev;
+       return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+
+void bt_mutexlock(BtMutexLatch *latch)
+{
+ushort prev;
 
-       if( lock->tid == tid ) {
-               lock->dup++;
+  do {
+#ifdef  unix
+       prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
+#else
+       prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
+#endif
+       if( !(prev & XCL) )
+         if( !(prev & ~BOTH) )
+               return;
+         else
+#ifdef unix
+               __sync_fetch_and_and ((ushort *)latch, ~XCL);
+#else
+               _InterlockedAnd16((ushort *)latch, ~XCL);
+#endif
+#ifdef  unix
+  } while( sched_yield(), 1 );
+#else
+  } while( SwitchToThread(), 1 );
+#endif
+/*
+unsigned char prev;
+
+  do {
+#ifdef  unix
+       prev = __sync_fetch_and_or(latch->exclusive, XCL);
+#else
+       prev = _InterlockedOr8(latch->exclusive, XCL);
+#endif
+       if( !(prev & XCL) )
+               return;
+#ifdef  unix
+  } while( sched_yield(), 1 );
+#else
+  } while( SwitchToThread(), 1 );
+#endif
+*/
+/*
+BtMutexLatch prev[1];
+uint slept = 0;
+
+  while( 1 ) {
+       *prev->value = __sync_fetch_and_or(latch->value, XCL);
+
+       if( !prev->bits->xlock ) {                      // did we set XCL bit?
+           if( slept )
+                 __sync_fetch_and_sub(latch->value, WRT);
                return;
        }
 
-       while( 1 ) {
+       if( !slept ) {
+               prev->bits->wrt++;
+               __sync_fetch_and_add(latch->value, WRT);
+       }
+
+       sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
+       slept = 1;
+  } */
+}
+
+//     try to obtain write lock
+
+//     return 1 if obtained,
+//             0 otherwise
+
+int bt_mutextry(BtMutexLatch *latch)
+{
+ushort prev;
+
+#ifdef  unix
+       prev = __sync_fetch_and_or((ushort *)latch, XCL);
+#else
+       prev = _InterlockedOr16((ushort *)latch, XCL);
+#endif
+       //      take write access if all bits are clear
+
+       if( !(prev & XCL) )
+         if( !(prev & ~BOTH) )
+               return 1;
+         else
 #ifdef unix
-         prev = __sync_fetch_and_or (lock->exclusive, 1);
+               __sync_fetch_and_and ((ushort *)latch, ~XCL);
 #else
-         prev = _InterlockedExchangeOr (lock->exclusive, 1);
+               _InterlockedAnd16((ushort *)latch, ~XCL);
 #endif
-         if( !(prev & XCL) ) {
-               lock->tid = tid;
-               return;
-         }
+       return 0;
+/*
+unsigned char prev;
+
+#ifdef  unix
+       prev = __sync_fetch_and_or(latch->exclusive, XCL);
+#else
+       prev = _InterlockedOr8(latch->exclusive, XCL);
+#endif
+       //      take write access if all bits are clear
+
+       return !(prev & XCL);
+*/
+/*
+BtMutexLatch prev[1];
+
+       *prev->value = __sync_fetch_and_or(latch->value, XCL);
+
+       //      take write access if exclusive bit is clear
+
+       return !prev->bits->xlock;
+*/
+}
+
+//     clear write mode
+
+void bt_releasemutex(BtMutexLatch *latch)
+{
 #ifdef unix
-         sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
+       __sync_fetch_and_and((ushort *)latch, ~BOTH);
 #else
-         SwitchToThread ();
+       _InterlockedAnd16((ushort *)latch, ~BOTH);
 #endif
+/*
+       *latch->exclusive = 0;
+*/
+/*
+BtMutexLatch prev[1];
+
+       *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
+
+       if( prev->bits->wrt )
+         sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+*/
+}
+
+//     Write-Only Queue Lock
+
+void WriteOLock (WOLock *lock, ushort tid)
+{
+       if( *lock->tid == tid ) {
+               *lock->dup += 1;
+               return;
        }
+
+       bt_mutexlock(lock->xcl);
+       *lock->tid = tid;
 }
 
 void WriteORelease (WOLock *lock)
 {
-       if( lock->dup ) {
-               lock->dup--;
+       if( *lock->dup ) {
+               *lock->dup -= 1;
                return;
        }
 
-       *lock->exclusive = 0;
-       lock->tid = 0;
-#ifdef linux
-       sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
+       *lock->tid = 0;
+       bt_releasemutex(lock->xcl);
 }
 
 //     Phase-Fair reader/writer lock implementation
@@ -510,8 +660,8 @@ void WriteLock (RWLock *lock, ushort tid)
 {
 ushort w, r, tix;
 
-       if( lock->tid == tid ) {
-               lock->dup++;
+       if( *lock->tid == tid ) {
+               *lock->dup += 1;
                return;
        }
 #ifdef unix
@@ -540,17 +690,17 @@ ushort w, r, tix;
 #else
                SwitchToThread();
 #endif
-       lock->tid = tid;
+       *lock->tid = tid;
 }
 
 void WriteRelease (RWLock *lock)
 {
-       if( lock->dup ) {
-               lock->dup--;
+       if( *lock->dup ) {
+               *lock->dup -= 1;
                return;
        }
 
-       lock->tid = 0;
+       *lock->tid = 0;
 #ifdef unix
        __sync_fetch_and_and (lock->rin, ~MASK);
 #else
@@ -568,8 +718,8 @@ ushort w;
 
        //  OK if write lock already held by same thread
 
-       if( lock->tid == tid ) {
-               lock->dup++;
+       if( *lock->tid == tid ) {
+               *lock->dup += 1;
                return 1;
        }
 #ifdef unix
@@ -593,8 +743,9 @@ ushort w;
 void ReadLock (RWLock *lock, ushort tid)
 {
 ushort w;
-       if( lock->tid == tid ) {
-               lock->dup++;
+
+       if( *lock->tid == tid ) {
+               *lock->dup += 1;
                return;
        }
 #ifdef unix
@@ -613,8 +764,8 @@ ushort w;
 
 void ReadRelease (RWLock *lock)
 {
-       if( lock->dup ) {
-               lock->dup--;
+       if( *lock->dup ) {
+               *lock->dup -= 1;
                return;
        }
 
@@ -625,62 +776,6 @@ void ReadRelease (RWLock *lock)
 #endif
 }
 
-//     lite weight spin lock Latch Manager
-
-int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
-{
-       return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
-}
-
-void bt_mutexlock(BtMutexLatch *latch)
-{
-uint prev;
-
-  while( 1 ) {
-#ifdef  unix
-       prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
-       prev = _InterlockedOr(latch->exclusive, XCL);
-#endif
-       if( !(prev & XCL) )
-               return;
-#ifdef  unix
-       sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
-#else
-       SwitchToThread();
-#endif
-  }
-}
-
-//     try to obtain write lock
-
-//     return 1 if obtained,
-//             0 otherwise
-
-int bt_mutextry(BtMutexLatch *latch)
-{
-uint prev;
-
-#ifdef  unix
-       prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
-       prev = _InterlockedOr(latch->exclusive, XCL);
-#endif
-       //      take write access if exclusive bit is clear
-
-       return !(prev & XCL);
-}
-
-//     clear write mode
-
-void bt_releasemutex(BtMutexLatch *latch)
-{
-       *latch->exclusive = 0;
-#ifdef unix
-       sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
-}
-
 //     recovery manager -- flush dirty pages
 
 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
@@ -692,7 +787,7 @@ uint entry;
 
   //   flush dirty pool pages to the btree
 
-fprintf(stderr, "Start flushlsn\n");
+fprintf(stderr, "Start flushlsn  ");
   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
        page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
        latch = mgr->latchsets + entry;
@@ -700,17 +795,18 @@ fprintf(stderr, "Start flushlsn\n");
        bt_lockpage(BtLockRead, latch, thread_no);
 
        if( latch->dirty ) {
-         bt_writepage(mgr, page, latch->page_no);
+         bt_writepage(mgr, page, latch->page_no, 0);
          latch->dirty = 0, cnt++;
        }
-if( latch->avail )
-cnt3++;
 if( latch->pin & ~CLOCK_bit )
 cnt2++;
        bt_unlockpage(BtLockRead, latch);
        bt_releasemutex (latch->modify);
   }
-fprintf(stderr, "End flushlsn %d pages  %d pinned  %d available\n", cnt, cnt2, cnt3);
+fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
+fprintf(stderr, "begin sync");
+       sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
+fprintf(stderr, " end sync\n");
 }
 
 //     recovery manager -- process current recovery buff on startup
@@ -746,36 +842,6 @@ BtVal *val;
   }
 }
 
-//     recovery manager -- dump current recovery buff & flush dirty pages
-//             in preparation for next recovery buffer.
-
-BTERR bt_dumpredo (BtMgr *mgr)
-{
-BtLogHdr *eof;
-fprintf(stderr, "Flush pages ");
-
-       eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
-       memset (eof, 0, sizeof(BtLogHdr));
-
-       //  flush pages written at beginning of this redo buffer
-       //      then write the redo buffer out to disk
-
-       fdatasync (mgr->idx);
-
-fprintf(stderr, "Dump ReDo: %d bytes\n", mgr->redoend);
-       pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
-
-       sync_file_range (mgr->idx, REDO_page << mgr->page_bits, mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
-
-       mgr->flushlsn = mgr->lsn;
-       mgr->redoend = 0;
-
-       eof = (BtLogHdr *)(mgr->redobuff);
-       memset (eof, 0, sizeof(BtLogHdr));
-       eof->lsn = mgr->lsn;
-       return 0;
-}
-
 //     recovery manager -- append new entry to recovery log
 //     flush to disk when it overflows.
 
@@ -784,7 +850,7 @@ logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ush
 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
 uint amt = sizeof(BtLogHdr);
 BtLogHdr *hdr, *eof;
-uint flush;
+uint last, end;
 
        bt_mutexlock (mgr->redo);
 
@@ -794,11 +860,12 @@ uint flush;
        //      see if new entry fits in buffer
        //      flush and reset if it doesn't
 
-       if( flush = amt > size - mgr->redoend ) {
-         bt_mutexlock (mgr->dump);
-
-         if( bt_dumpredo (mgr) )
-               return 0;
+       if( amt > size - mgr->redoend ) {
+         mgr->flushlsn = mgr->lsn;
+         msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+         mgr->redolast = 0;
+         mgr->redoend = 0;
+         bt_flushlsn(mgr, thread_no);
        }
 
        //      fill in new entry & either eof or end block
@@ -822,13 +889,17 @@ uint flush;
          memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
        }
 
-       bt_releasemutex(mgr->redo);
+       eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
+       memset (eof, 0, sizeof(BtLogHdr));
+       eof->lsn = mgr->lsn;
 
-       if( flush ) {
-         bt_flushlsn (mgr, thread_no);
-         bt_releasemutex(mgr->dump);
-       }
+       last = mgr->redolast & 0xfff;
+       end = mgr->redoend;
+       mgr->redolast = end;
 
+       bt_releasemutex(mgr->redo);
+
+       msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
        return hdr->lsn;
 }
 
@@ -840,8 +911,8 @@ logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
 uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
 uint amt = 0, src, type;
 BtLogHdr *hdr, *eof;
+uint last, end;
 logseqno lsn;
-uint flush;
 BtKey *key;
 BtVal *val;
 
@@ -859,11 +930,12 @@ BtVal *val;
        //      see if new entry fits in buffer
        //      flush and reset if it doesn't
 
-       if( flush = amt > size - mgr->redoend ) {
-         bt_mutexlock (mgr->dump);
-
-         if( bt_dumpredo (mgr) )
-               return 0;
+       if( amt > size - mgr->redoend ) {
+         mgr->flushlsn = mgr->lsn;
+         msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+         mgr->redolast = 0;
+         mgr->redoend = 0;
+         bt_flushlsn (mgr, thread_no);
        }
 
        //      assign new lsn to transaction
@@ -886,9 +958,6 @@ BtVal *val;
          case Delete:
                type = BTRM_del;
                break;
-         case Update:
-               type = BTRM_upd;
-               break;
          }
 
          amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
@@ -910,14 +979,15 @@ BtVal *val;
 
        eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
        memset (eof, 0, sizeof(BtLogHdr));
+       eof->lsn = lsn;
 
+       last = mgr->redolast & 0xfff;
+       end = mgr->redoend;
+       mgr->redolast = end;
        bt_releasemutex(mgr->redo);
 
-       if( flush ) {
-         bt_flushlsn (mgr, thread_no);
-         bt_releasemutex(mgr->dump);
-       }
-
+       msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+       bt_releasemutex(mgr->redo);
        return lsn;
 }
 
@@ -927,60 +997,82 @@ BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
 {
 off64_t off = page_no << mgr->page_bits;
 
-#ifdef unix
        if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
                fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
-               return mgr->err = BTERR_read;
-       }
-#else
-OVERLAPPED ovl[1];
-uint amt[1];
-
-       memset (ovl, 0, sizeof(OVERLAPPED));
-       ovl->Offset = off;
-       ovl->OffsetHigh = off >> 32;
-
-       if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
-               fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
                return BTERR_read;
        }
-       if( *amt <  mgr->page_size ) {
-               fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
-               return BTERR_read;
-       }
-#endif
+if( page->page_no != page_no )
+abort();
        mgr->reads++;
        return 0;
+/*
+int flag = PROT_READ | PROT_WRITE;
+uint segment = page_no >> 32;
+unsigned char *perm;
+
+  while( 1 ) {
+       if( segment < mgr->segments ) {
+         perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+               memcpy (page, perm, mgr->page_size);
+if( page->page_no != page_no )
+abort();
+               mgr->reads++;
+               return 0;
+       }
+
+       bt_mutexlock (mgr->maps);
+
+       if( segment < mgr->segments ) {
+               bt_releasemutex (mgr->maps);
+               continue;
+       }
+
+       mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
+       mgr->segments++;
+
+       bt_releasemutex (mgr->maps);
+  }  */
 }
 
 //     write page to permanent location in Btree file
 //     clear the dirty bit
 
-BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
+BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
 {
 off64_t off = page_no << mgr->page_bits;
 
-       page->page_no = page_no;
-
-#ifdef unix
-       if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
+       if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) {
+               fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno);
                return BTERR_wrt;
-#else
-OVERLAPPED ovl[1];
-uint amt[1];
+       }
+       mgr->writes++;
+       return 0;
+/*
+int flag = PROT_READ | PROT_WRITE;
+uint segment = page_no >> 32;
+unsigned char *perm;
 
-       memset (ovl, 0, sizeof(OVERLAPPED));
-       ovl->Offset = off;
-       ovl->OffsetHigh = off >> 32;
+  while( 1 ) {
+       if( segment < mgr->segments ) {
+         perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+               memcpy (perm, page, mgr->page_size);
+               if( syncit )
+                       msync (perm, mgr->page_size, MS_SYNC);
+               mgr->writes++;
+               return 0;
+       }
 
-       if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
-               return BTERR_wrt;
+       bt_mutexlock (mgr->maps);
 
-       if( *amt <  mgr->page_size )
-               return BTERR_wrt;
-#endif
-       mgr->writes++;
-       return 0;
+       if( segment < mgr->segments ) {
+               bt_releasemutex (mgr->maps);
+               continue;
+       }
+
+       mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
+       bt_releasemutex (mgr->maps);
+       mgr->segments++;
+  } */
 }
 
 //     set CLOCK bit in latch
@@ -992,26 +1084,10 @@ void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
        latch->pin |= CLOCK_bit;
        latch->pin--;
 
-       // if not doing redo recovery
-       //      set latch available
-
-       if( mgr->redopages )
-         if( !(latch->pin & ~CLOCK_bit) ) {
-               latch->avail = 1;
-#ifdef unix
-               __sync_fetch_and_add (&mgr->available, 1);
-#else
-               _InterlockedIncrement(&mgr->available);
-#endif
-       }
-
        bt_releasemutex(latch->modify);
 }
 
 //  return the btree cached page address
-//     if page is dirty and has not yet been
-//     flushed to disk for the current redo
-//     recovery buffer, write it out.
 
 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
 {
@@ -1023,24 +1099,16 @@ BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
 //  return next available latch entry
 //       and with latch entry locked
 
-uint bt_availnext (BtMgr *mgr, ushort thread_id)
+uint bt_availnext (BtMgr *mgr)
 {
 BtLatchSet *latch;
 uint entry;
 
   while( 1 ) {
-
-  //  flush dirty pages if none are available
-  //   and we aren't doing redo recovery
-
-  if( !mgr->redopages )
-       if( !mgr->available )
-         bt_flushlsn (mgr, thread_id);
-
 #ifdef unix
-       entry = __sync_fetch_and_add (&mgr->latchavail, 1) + 1;
+       entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
 #else
-       entry = _InterlockedIncrement (&mgr->latchavail);
+       entry = _InterlockedIncrement (&mgr->latchvictim);
 #endif
        entry %= mgr->latchtotal;
 
@@ -1049,24 +1117,26 @@ uint entry;
 
        latch = mgr->latchsets + entry;
 
-       if( !latch->avail )
+       if( !bt_mutextry(latch->modify) )
                continue;
 
-       bt_mutexlock(latch->modify);
+       //  return this entry if it is not pinned
 
-       if( !latch->avail ) {
-               bt_releasemutex(latch->modify);
-               continue;
-       }
+       if( !latch->pin )
+               return entry;
+
+       //      if the CLOCK bit is set
+       //      reset it to zero.
 
-       return entry;
+       latch->pin &= ~CLOCK_bit;
+       bt_releasemutex(latch->modify);
   }
 }
 
-//     find available latchset
+//     pin page in buffer pool
 //     return with latchset pinned
 
-BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage loadit, ushort thread_id)
+BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
 {
 uint hashidx = page_no % mgr->latchhash;
 BtLatchSet *latch;
@@ -1085,34 +1155,25 @@ BtPage page;
   } while( entry = latch->next );
 
   //  found our entry: increment pin
-  //  remove from available status
 
   if( entry ) {
        latch = mgr->latchsets + entry;
        bt_mutexlock(latch->modify);
-       if( latch->avail )
-#ifdef unix
-         __sync_fetch_and_add (&mgr->available, -1);
-#else
-         _InterlockedDecrement(&mgr->available);
-#endif
-       latch->avail = 0;
        latch->pin |= CLOCK_bit;
        latch->pin++;
-
+if(contents)
+abort();
        bt_releasemutex(latch->modify);
        bt_releasemutex(mgr->hashtable[hashidx].latch);
        return latch;
   }
 
-  //  find and reuse entry from available set
+  //  find and reuse unpinned entry
 
 trynext:
 
-  if( entry = bt_availnext (mgr, thread_id) )
-       latch = mgr->latchsets + entry;
-  else
-       return mgr->line = __LINE__, mgr->err = BTERR_avail, NULL;
+  entry = bt_availnext (mgr);
+  latch = mgr->latchsets + entry;
 
   idx = latch->page_no % mgr->latchhash;
 
@@ -1140,27 +1201,21 @@ trynext:
     bt_releasemutex (mgr->hashtable[idx].latch);
    }
 
-  //  remove available status
-
-  latch->avail = 0;
-#ifdef unix
-  __sync_fetch_and_add (&mgr->available, -1);
-#else
-  _InterlockedDecrement(&mgr->available);
-#endif
   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
 
+  //  update permanent page area in btree from buffer pool
+  //   no read-lock is required since page is not pinned.
+
   if( latch->dirty )
-       if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
+       if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
          return mgr->line = __LINE__, NULL;
        else
          latch->dirty = 0;
 
-  if( loadit ) {
-       memcpy (page, loadit, mgr->page_size);
+  if( contents ) {
+       memcpy (page, contents, mgr->page_size);
        latch->dirty = 1;
-  } else
-       if( bt_readpage (mgr, page, page_no) )
+  } else if( bt_readpage (mgr, page, page_no) )
          return mgr->line = __LINE__, NULL;
 
   //  link page as head of hash table chain
@@ -1217,7 +1272,7 @@ uint slot;
                latch = mgr->latchsets + slot;
 
                if( latch->dirty ) {
-                       bt_writepage(mgr, page, latch->page_no);
+                       bt_writepage(mgr, page, latch->page_no, 0);
                        latch->dirty = 0, num++;
                }
        }
@@ -1240,6 +1295,9 @@ uint slot;
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
 
 #ifdef unix
+       while( mgr->segments )
+               munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
+
        munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
        munmap (mgr->pagezero, mgr->page_size);
 #else
@@ -1368,15 +1426,19 @@ BtVal *val;
        // and page(s) of latches and page pool cache
 
        memset (pagezero, 0, 1 << bits);
+       pagezero->alloc->lvl = MIN_lvl - 1;
        pagezero->alloc->bits = mgr->page_bits;
        bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
+       pagezero->activepages = 2;
 
        //  initialize left-most LEAF page in
-       //      alloc->left.
+       //      alloc->left and count of active leaf pages.
 
        bt_putid (pagezero->alloc->left, LEAF_page);
 
-       if( bt_writepage (mgr, pagezero->alloc, 0) ) {
+       ftruncate (mgr->idx, REDO_page << mgr->page_bits);
+
+       if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
                fprintf (stderr, "Unable to create btree page zero\n");
                return bt_mgrclose (mgr), NULL;
        }
@@ -1385,7 +1447,8 @@ BtVal *val;
        pagezero->alloc->bits = mgr->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
-               slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
+       BtSlot *node = slotptr(pagezero->alloc, 1);
+               node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
                key = keyptr(pagezero->alloc, 1);
                key->len = 2;           // create stopper key
                key->key[0] = 0xff;
@@ -1396,13 +1459,14 @@ BtVal *val;
                val->len = lvl ? BtId : 0;
                memcpy (val->value, value, val->len);
 
-               pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
+               pagezero->alloc->min = node->off;
                pagezero->alloc->lvl = lvl;
                pagezero->alloc->cnt = 1;
                pagezero->alloc->act = 1;
+               pagezero->alloc->page_no = MIN_lvl - lvl;
 
-               if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
-                       fprintf (stderr, "Unable to create btree page zero\n");
+               if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
+                       fprintf (stderr, "Unable to create btree page\n");
                        return bt_mgrclose (mgr), NULL;
                }
        }
@@ -1429,8 +1493,10 @@ mgrlatch:
                fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
-       if( mgr->redopages = redopages )
+       if( mgr->redopages = redopages ) {
+               ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
                mgr->redobuff = valloc (redopages * mgr->page_size);
+       }
 #else
        flag = PAGE_READWRITE;
        mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
@@ -1468,14 +1534,6 @@ mgrlatch:
        mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
        mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
 
-       //      mark all pool entries as available
-
-       for( idx = 1; idx < mgr->latchtotal; idx++ ) {
-               latch = mgr->latchsets + idx;
-               latch->avail = 1;
-               mgr->available++;
-       }
-
        return mgr;
 }
 
@@ -1599,13 +1657,15 @@ int blk;
        // use empty chain first
        // else allocate empty page
 
-       if( page_no = bt_getid(mgr->pagezero->chain) ) {
+       if( page_no = bt_getid(mgr->pagezero->freechain) ) {
                if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
                        set->page = bt_mappage (mgr, set->latch);
                else
-                       return mgr->err = BTERR_struct, mgr->line = __LINE__, -1;
+                       return mgr->line = __LINE__, mgr->err = BTERR_struct;
+
+               bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
+               mgr->pagezero->activepages++;
 
-               bt_putid(mgr->pagezero->chain, bt_getid(set->page->right));
                bt_releasemutex(mgr->lock);
 
                memcpy (set->page, contents, mgr->page_size);
@@ -1619,16 +1679,20 @@ int blk;
        // unlock allocation latch and
        //      extend file into new page.
 
+       mgr->pagezero->activepages++;
+
+       ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits);
        bt_releasemutex(mgr->lock);
 
-       //      don't load cache from btree page
+       //      don't load cache from btree page, load it from contents
+
+       contents->page_no = page_no;
 
        if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
                set->page = bt_mappage (mgr, set->latch);
        else
                return mgr->err;
 
-       set->latch->dirty = 1;
        return 0;
 }
 
@@ -1778,20 +1842,22 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set)
 
        //      store chain
 
-       memcpy(set->page->right, mgr->pagezero->chain, BtId);
-       bt_putid(mgr->pagezero->chain, set->latch->page_no);
+       memcpy(set->page->right, mgr->pagezero->freechain, BtId);
+       bt_putid(mgr->pagezero->freechain, set->latch->page_no);
        set->latch->dirty = 1;
        set->page->free = 1;
 
+       // decrement active page count
+       // and unlock allocation page
+
+       mgr->pagezero->activepages--;
+       bt_releasemutex (mgr->lock);
+
        // unlock released page
 
        bt_unlockpage (BtLockDelete, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
        bt_unpinlatch (mgr, set->latch);
-
-       // unlock allocation page
-
-       bt_releasemutex (mgr->lock);
 }
 
 //     a fence key was deleted from a page
@@ -1824,7 +1890,7 @@ uint idx;
        bt_putid (value, set->latch->page_no);
        ptr = (BtKey*)leftkey;
 
-       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
          return mgr->err;
 
        //      now delete old fence key
@@ -1885,7 +1951,6 @@ uint idx;
 
 //  delete a page and manage keys
 //  call with page writelocked
-//     returns with page unpinned
 
 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
 {
@@ -1897,7 +1962,7 @@ uid page_no;
 BtKey *ptr;
 
        //      cache copy of fence key
-       //      to post in parent
+       //      to remove in parent
 
        ptr = keyptr(set->page, set->page->cnt);
        memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
@@ -1945,7 +2010,7 @@ BtKey *ptr;
        bt_putid (value, set->latch->page_no);
        ptr = (BtKey*)higherfence;
 
-       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
          return mgr->err;
 
        //      delete old lower key to our node
@@ -1963,7 +2028,6 @@ BtKey *ptr;
        bt_freepage (mgr, right);
 
        bt_unlockpage (BtLockParent, set->latch);
-       bt_unpinlatch (mgr, set->latch);
        return 0;
 }
 
@@ -1974,33 +2038,43 @@ BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort t
 {
 uint slot, idx, found, fence;
 BtPageSet set[1];
+BtSlot *node;
 BtKey *ptr;
 BtVal *val;
 
-       if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) )
+       if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
+               node = slotptr(set->page, slot);
                ptr = keyptr(set->page, slot);
-       else
+       else
                return mgr->err;
 
        // if librarian slot, advance to real slot
 
-       if( slotptr(set->page, slot)->type == Librarian )
+       if( node->type == Librarian ) {
                ptr = keyptr(set->page, ++slot);
+               node = slotptr(set->page, slot);
+       }
 
        fence = slot == set->page->cnt;
 
-       // if key is found delete it, otherwise ignore request
+       // delete the key, ignore request if already dead
 
        if( found = !keycmp (ptr, key, len) )
-         if( found = slotptr(set->page, slot)->dead == 0 ) {
+         if( found = node->dead == 0 ) {
                val = valptr(set->page,slot);
-               slotptr(set->page, slot)->dead = 1;
                set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
                set->page->act--;
 
+               //  mark node type as delete
+
+               node->type = Delete;
+               node->dead = 1;
+
                // collapse empty slots beneath the fence
+               // on interiour nodes
 
-               while( idx = set->page->cnt - 1 )
+               if( lvl )
+                while( idx = set->page->cnt - 1 )
                  if( slotptr(set->page, idx)->dead ) {
                        *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
                        memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
@@ -2026,11 +2100,14 @@ BtVal *val;
 
        //      delete empty page
 
-       if( !set->page->act )
-               return bt_deletepage (mgr, set, thread_no);
+       if( !set->page->act ) {
+         if( bt_deletepage (mgr, set, thread_no) )
+               return mgr->err;
+       } else {
+         set->latch->dirty = 1;
+         bt_unlockpage(BtLockWrite, set->latch);
+       }
 
-       set->latch->dirty = 1;
-       bt_unlockpage(BtLockWrite, set->latch);
        bt_unpinlatch (mgr, set->latch);
        return 0;
 }
@@ -2278,6 +2355,8 @@ BtVal *val;
        root->page->act = 2;
        root->page->lvl++;
 
+       mgr->pagezero->alloc->lvl = root->page->lvl;
+
        // release and unpin root pages
 
        bt_unlockpage(BtLockWrite, root->latch);
@@ -2440,7 +2519,7 @@ BtKey *ptr;
        bt_putid (value, set->latch->page_no);
        ptr = (BtKey *)leftkey;
 
-       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
                return mgr->err;
 
        // switch fence for right block of larger keys to new right page
@@ -2448,7 +2527,7 @@ BtKey *ptr;
        bt_putid (value, right->page_no);
        ptr = (BtKey *)rightkey;
 
-       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+       if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
                return mgr->err;
 
        bt_unlockpage (BtLockParent, set->latch);
@@ -2537,37 +2616,19 @@ BtVal *val;
 //  Insert new key into the btree at given level.
 //     either add a new key or update/add an existing one
 
-BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique, ushort thread_no)
+BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
 {
-unsigned char newkey[BT_keyarray];
 uint slot, idx, len, entry;
 BtPageSet set[1];
-BtKey *ptr, *ins;
-uid sequence;
+BtSlot *node;
+BtKey *ptr;
 BtVal *val;
-uint type;
-
-  // set up the key we're working on
-
-  ins = (BtKey*)newkey;
-  memcpy (ins->key, key, keylen);
-  ins->len = keylen;
-
-  // is this a non-unique index value?
-
-  if( unique )
-       type = Unique;
-  else {
-       type = Duplicate;
-       sequence = bt_newdup (mgr);
-       bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
-       ins->len += BtId;
-  }
 
   while( 1 ) { // find the page and slot for the current key
-       if( slot = bt_loadpage (mgr, set, ins->key, ins->len, lvl, BtLockWrite, thread_no) )
+       if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
+               node = slotptr(set->page, slot);
                ptr = keyptr(set->page, slot);
-       else {
+       else {
                if( !mgr->err )
                        mgr->line = __LINE__, mgr->err = BTERR_ovflw;
                return mgr->err;
@@ -2575,80 +2636,86 @@ uint type;
 
        // if librarian slot == found slot, advance to real slot
 
-       if( slotptr(set->page, slot)->type == Librarian )
-         if( !keycmp (ptr, key, keylen) )
+       if( node->type == Librarian )
+         if( !keycmp (ptr, key, keylen) ) {
                ptr = keyptr(set->page, ++slot);
+               node = slotptr(set->page, slot);
+         }
 
-       len = ptr->len;
-
-       if( slotptr(set->page, slot)->type == Duplicate )
-               len -= BtId;
-
-       //  if inserting a duplicate key or unique key
+       //  if inserting a duplicate key or unique
+       //      key that doesn't exist on the page,
        //      check for adequate space on the page
        //      and insert the new key before slot.
 
-       if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
-         if( !(slot = bt_cleanpage (mgr, set, ins->len, slot, vallen)) )
-               if( !(entry = bt_splitpage (mgr, set, thread_no)) )
-                 return mgr->err;
-               else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
-                 return mgr->err;
-               else
-                 continue;
-
-         return bt_insertslot (mgr, set, slot, ins->key, ins->len, value, vallen, type, 1);
-       }
+       switch( type ) {
+       case Unique:
+       case Duplicate:
+         if( keycmp (ptr, key, keylen) )
+          if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
+           return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
+          else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
+               return mgr->err;
+          else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+               return mgr->err;
+          else
+               continue;
 
-       // if key already exists, update value and return
+         // if key already exists, update value and return
 
-       val = valptr(set->page, slot);
+         val = valptr(set->page, slot);
 
-       if( val->len >= vallen ) {
+         if( val->len >= vallen ) {
                if( slotptr(set->page, slot)->dead )
                        set->page->act++;
+               node->type = type;
+               node->dead = 0;
+
                set->page->garbage += val->len - vallen;
                set->latch->dirty = 1;
-               slotptr(set->page, slot)->dead = 0;
                val->len = vallen;
                memcpy (val->value, value, vallen);
                bt_unlockpage(BtLockWrite, set->latch);
                bt_unpinlatch (mgr, set->latch);
                return 0;
-       }
+         }
 
-       //      new update value doesn't fit in existing value area
+         //  new update value doesn't fit in existing value area
+         //    make sure page has room
 
-       if( !slotptr(set->page, slot)->dead )
+         if( !node->dead )
                set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
-       else {
-               slotptr(set->page, slot)->dead = 0;
+         else
                set->page->act++;
-       }
 
-       if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
-         if( !(entry = bt_splitpage (mgr, set, thread_no)) )
+         node->type = type;
+         node->dead = 0;
+
+         if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
+          if( !(entry = bt_splitpage (mgr, set, thread_no)) )
                return mgr->err;
-         else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+          else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
                return mgr->err;
-         else
+          else
                continue;
 
-       set->page->min -= vallen + sizeof(BtVal);
-       val = (BtVal*)((unsigned char *)set->page + set->page->min);
-       memcpy (val->value, value, vallen);
-       val->len = vallen;
+         //  copy key and value onto page and update slot
 
-       set->latch->dirty = 1;
-       set->page->min -= keylen + sizeof(BtKey);
-       ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
-       memcpy (ptr->key, key, keylen);
-       ptr->len = keylen;
+         set->page->min -= vallen + sizeof(BtVal);
+         val = (BtVal*)((unsigned char *)set->page + set->page->min);
+         memcpy (val->value, value, vallen);
+         val->len = vallen;
+
+         set->latch->dirty = 1;
+         set->page->min -= keylen + sizeof(BtKey);
+         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+         memcpy (ptr->key, key, keylen);
+         ptr->len = keylen;
        
-       slotptr(set->page, slot)->off = set->page->min;
-       bt_unlockpage(BtLockWrite, set->latch);
-       bt_unpinlatch (mgr, set->latch);
-       return 0;
+         node->off = set->page->min;
+         bt_unlockpage(BtLockWrite, set->latch);
+         bt_unpinlatch (mgr, set->latch);
+         return 0;
+    }
   }
   return 0;
 }
@@ -2745,32 +2812,42 @@ uint entry, slot;
   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
 }
 
+//     perform delete from smaller btree
+//  insert a delete slot if not found there
+
 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
 {
 BtKey *key = keyptr(source, src);
 BtPageSet set[1];
 uint idx, slot;
+BtSlot *node;
 BtKey *ptr;
 BtVal *val;
 
-       if( slot = bt_atomicpage (mgr, source, locks, src, set) )
+       if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
+         node = slotptr(set->page, slot);
          ptr = keyptr(set->page, slot);
-       else
+         val = valptr(set->page, slot);
+       } else
          return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
-       if( !keycmp (ptr, key->key, key->len) )
-         if( !slotptr(set->page, slot)->dead )
-               slotptr(set->page, slot)->dead = 1;
-         else
-               return 0;
-       else
+       //  if slot is not found, insert a delete slot
+
+       if( keycmp (ptr, key->key, key->len) )
+         return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
+
+       //      if node is already dead,
+       //      ignore the request.
+
+       if( node->dead )
                return 0;
 
-       val = valptr(set->page, slot);
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
        set->latch->dirty = 1;
        set->page->lsn = locks[src].lsn;
        set->page->act--;
+
+       node->dead = 0;
        mgr->found++;
        return 0;
 }
@@ -2820,7 +2897,7 @@ BtKey *ptr;
        ptr = keyptr(right->page,right->page->cnt);
        bt_putid (value, prev->latch->page_no);
 
-       if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, 1, thread_no) )
+       if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
                return mgr->err;
 
        //  now that master page is in good shape we can
@@ -2835,6 +2912,8 @@ BtKey *ptr;
        if( right_page_no = bt_getid (prev->page->right) ) {
          if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
                temp->page = bt_mappage (mgr, temp->latch);
+         else
+               return mgr->err;
 
          bt_lockpage (BtLockWrite, temp->latch, thread_no);
          bt_putid (temp->page->left, prev->latch->page_no);
@@ -2859,181 +2938,6 @@ BtKey *ptr;
        return 0;
 }
 
-//     find and add the next available latch entry
-//     to the queue
-
-BTERR bt_txnavaillatch (BtDb *bt)
-{
-BtLatchSet *latch;
-uint startattempt;
-uint cnt, entry;
-uint hashidx;
-BtPage page;
-
-  //  find and reuse previous entry on victim
-
-  startattempt = bt->mgr->latchvictim;
-
-  while( 1 ) {
-#ifdef unix
-       entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
-#else
-       entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
-#endif
-       //      skip entry if it has outstanding pins
-
-       entry %= bt->mgr->latchtotal;
-
-       if( !entry )
-               continue;
-
-       //      only go around one time before
-       //      flushing redo recovery buffer,
-       //      and the buffer pool to free up entries.
-
-       if( bt->mgr->redopages )
-         if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
-           if( bt_mutextry (bt->mgr->dump) ) {
-                if( bt_dumpredo (bt->mgr) )
-                 return bt->mgr->err;
-                bt_flushlsn (bt->mgr, bt->thread_no);
-               // synchronize the various threads running into this condition
-               //      so that only one thread does the dump and flush
-               } else
-                       bt_mutexlock(bt->mgr->dump);
-
-               startattempt = bt->mgr->latchvictim;
-               bt_releasemutex(bt->mgr->dump);
-         }
-
-       latch = bt->mgr->latchsets + entry;
-
-       if( latch->avail )
-         continue;
-
-       bt_mutexlock(latch->modify);
-
-       //      skip if already an available entry
-
-       if( latch->avail ) {
-         bt_releasemutex(latch->modify);
-         continue;
-       }
-
-       //  skip this entry if it is pinned
-       //      if the CLOCK bit is set
-       //      reset it to zero.
-
-       if( latch->pin ) {
-         latch->pin &= ~CLOCK_bit;
-         bt_releasemutex(latch->modify);
-         continue;
-       }
-
-       page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
-
-       //      if dirty page has lsn >= last redo recovery buffer
-       //      then hold page in the buffer pool until next redo
-       //      recovery buffer is being written to disk.
-
-       if( latch->dirty )
-         if( page->lsn >= bt->mgr->flushlsn ) {
-           bt_releasemutex(latch->modify);
-               continue;
-         }
-
-       //      entry is now available
-#ifdef unix
-       __sync_fetch_and_add (&bt->mgr->available, 1);
-#else
-       _InterlockedIncrement(&bt->mgr->available);
-#endif
-       latch->avail = 1;
-       bt_releasemutex(latch->modify);
-       return 0;
-  }
-}
-
-//     release available latch requests
-
-void bt_txnavailrelease (BtDb *bt, uint count)
-{
-#ifdef unix
-       __sync_fetch_and_add(bt->mgr->availlock, -count);
-#else
-       _InterlockedAdd(bt->mgr->availlock, -count);
-#endif
-}
-
-//     promote page of keys from first btree
-//  into main btree
-
-BTERR bt_txnavailmain (BtDb *bt)
-{
-BtLatchSet *latch;
-uint entry;
-
-  while( 1 ) {
-#ifdef unix
-       entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
-#else
-       entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
-#endif
-       //      skip entry if it has outstanding pins
-
-       entry %= bt->mgr->latchtotal;
-
-       if( !entry )
-               continue;
-
-       latch = bt->mgr->latchsets + entry;
-
-       if( latch->avail )
-         continue;
-
-       bt_mutexlock(latch->modify);
-
-       //      skip if already an available entry
-
-       if( latch->avail ) {
-         bt_releasemutex(latch->modify);
-         continue;
-       }
-
-       //  skip this entry if it is pinned
-       //      if the CLOCK bit is set
-       //      reset it to zero.
-
-       if( latch->pin ) {
-         latch->pin &= ~CLOCK_bit;
-         bt_releasemutex(latch->modify);
-         continue;
-       }
-
-  }
-}
-
-//     commit available pool entries
-//     find available entries as required
-
-BTERR bt_txnavailrequest (BtDb *bt, uint count)
-{
-#ifdef unix
-       __sync_fetch_and_add(bt->mgr->availlock, count);
-#else
-       _InterlockedAdd(bt->mgr->availlock, count);
-#endif
-
-       //      find another available pool entry
-
-       while( *bt->mgr->availlock > bt->mgr->available )
-         if( bt->mgr->redopages )
-               bt_txnavaillatch (bt);
-         else
-               if( bt_txnavailmain (bt) )
-                       return bt->mgr->err;
-}
-
 //     atomic modification of a batch of keys.
 
 //     return -1 if BTERR is set
@@ -3041,9 +2945,9 @@ BTERR bt_txnavailrequest (BtDb *bt, uint count)
 //     causing the key constraint violation
 //     or zero on successful completion.
 
-int bt_atomictxn (BtDb *bt, BtPage source)
+BTERR bt_atomictxn (BtDb *bt, BtPage source)
 {
-uint src, idx, slot, samepage, entry, avail, que = 0;
+uint src, idx, slot, samepage, entry, que = 0;
 AtomicKey *head, *tail, *leaf;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
@@ -3079,12 +2983,14 @@ int type;
        }
   }
 
-  // reserve enough buffer pool entries
-
-  avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
+  //  add entries to redo log
 
-  if( bt_txnavailrequest (bt, avail) )
-       return bt->mgr->err;
+  if( bt->mgr->redopages )
+   if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
+    for( src = 0; src++ < source->cnt; )
+        locks[src].lsn = lsn;
+    else
+        return bt->mgr->err;
 
   // Load the leaf page for each key
   // group same page references with reuse bit
@@ -3108,7 +3014,7 @@ int type;
          if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
                set->latch->split = 0;
          else
-               goto atomicerr;
+               return bt->mgr->err;
 
        if( slotptr(set->page, slot)->type == Librarian )
          ptr = keyptr(set->page, ++slot);
@@ -3128,34 +3034,6 @@ int type;
        //      capture current lsn for master page
 
        locks[src].reqlsn = set->page->lsn;
-
-       //      perform constraint checks
-
-       switch( slotptr(source, src)->type ) {
-       case Duplicate:
-       case Unique:
-         if( !slotptr(set->page, slot)->dead )
-          if( slot < set->page->cnt || bt_getid (set->page->right) )
-           if( !keycmp (ptr, key->key, key->len) ) {
-
-                 // return constraint violation if key already exists
-
-                 bt_unlockpage(BtLockRead, set->latch);
-                 result = src;
-
-                 while( src ) {
-                       if( locks[src].entry ) {
-                         set->latch = bt->mgr->latchsets + locks[src].entry;
-                         bt_unlockpage(BtLockAtomic, set->latch);
-                         bt_unpinlatch (bt->mgr, set->latch);
-                       }
-                       src--;
-                 }
-                 free (locks);
-                 return result;
-               }
-         break;
-       }
   }
 
   //  unlock last loadpage lock
@@ -3163,22 +3041,15 @@ int type;
   if( source->cnt )
        bt_unlockpage(BtLockRead, set->latch);
 
-  //  and add entries to redo log
-
-  if( bt->mgr->redopages )
-   if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
-    for( src = 0; src++ < source->cnt; )
-        locks[src].lsn = lsn;
-    else
-        goto atomicerr;
-
   //  obtain write lock for each master page
+  //  sync flushed pages to disk
 
   for( src = 0; src++ < source->cnt; ) {
        if( locks[src].reuse )
          continue;
-       else
-         bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry, bt->thread_no);
+
+       set->latch = bt->mgr->latchsets + locks[src].entry;
+       bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
   }
 
   // insert or delete each key
@@ -3204,13 +3075,13 @@ int type;
         switch( slotptr(source,idx)->type ) {
         case Delete:
          if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
-               goto atomicerr;
+               return bt->mgr->err;
          break;
 
        case Duplicate:
        case Unique:
          if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
-               goto atomicerr;
+               return bt->mgr->err;
          break;
        }
 
@@ -3293,7 +3164,7 @@ int type;
                if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
                  set->page = bt_mappage (bt->mgr, set->latch);
                else
-                 goto atomicerr;
+                 return bt->mgr->err;
 
            bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
            bt_putid (set->page->left, prev->latch->page_no);
@@ -3350,7 +3221,7 @@ int type;
        ptr = keyptr(prev->page,prev->page->cnt);
 
        if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
-               goto atomicerr;
+               return bt->mgr->err;
 
        //      perform the remainder of the delete
        //      from the FIFO queue
@@ -3376,13 +3247,6 @@ int type;
        bt_unlockpage(BtLockWrite, prev->latch);
   }
 
-  bt_txnavailrelease (bt, avail);
-
-  que *= bt->mgr->pagezero->alloc->lvl;
-
-  if( bt_txnavailrequest (bt, que) )
-       return bt->mgr->err;
-
   //  add & delete keys for any pages split or merged during transaction
 
   if( leaf = head )
@@ -3395,20 +3259,20 @@ int type;
 
          switch( leaf->type ) {
          case 0:       // insert key
-           if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, 1, bt->thread_no) )
-                 goto atomicerr;
+           if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
+                 return bt->mgr->err;
 
                break;
 
          case 1:       // delete key
                if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
-                 goto atomicerr;
+                 return bt->mgr->err;
 
                break;
 
          case 2:       // free page
                if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
-                 goto atomicerr;
+                 return bt->mgr->err;
 
                break;
          }
@@ -3421,14 +3285,131 @@ int type;
          free (leaf);
        } while( leaf = tail );
 
-  bt_txnavailrelease (bt, que);
+  // if number of active pages
+  // is greater than the buffer pool
+  // promote page into larger btree
+
+  while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+       if( bt_txnpromote (bt) )
+         return bt->mgr->err;
 
   // return success
 
   free (locks);
   return 0;
-atomicerr:
-  return -1;
+}
+
+//  promote a page into the larger btree
+
+BTERR bt_txnpromote (BtDb *bt)
+{
+uint entry, slot, idx;
+BtPageSet set[1];
+BtSlot *node;
+BtKey *ptr;
+BtVal *val;
+
+  while( 1 ) {
+#ifdef unix
+       entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
+#else
+       entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
+#endif
+       entry %= bt->mgr->latchtotal;
+
+       if( !entry )
+               continue;
+
+       set->latch = bt->mgr->latchsets + entry;
+       idx = set->latch->page_no % bt->mgr->latchhash;
+
+       if( !bt_mutextry(set->latch->modify) )
+               continue;
+
+//    if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
+//             bt_releasemutex(set->latch->modify);
+//             continue;
+//     }
+
+       //  skip this entry if it is pinned
+
+       if( set->latch->pin & ~CLOCK_bit ) {
+         bt_releasemutex(set->latch->modify);
+//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
+         continue;
+       }
+
+       set->page = bt_mappage (bt->mgr, set->latch);
+
+       // entry never used or has no right sibling
+
+       if( !set->latch->page_no || !bt_getid (set->page->right) ) {
+         bt_releasemutex(set->latch->modify);
+//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
+         continue;
+       }
+
+       bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
+       bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+    bt_unlockpage(BtLockAccess, set->latch);
+
+       // entry interiour node or being or was killed
+
+       if( set->page->lvl || set->page->free || set->page->kill ) {
+         bt_releasemutex(set->latch->modify);
+//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
+      bt_unlockpage(BtLockWrite, set->latch);
+         continue;
+       }
+
+       //  pin the page for our useage
+
+       set->latch->pin++;
+       bt_releasemutex(set->latch->modify);
+//    bt_releasemutex(bt->mgr->hashtable[idx].latch);
+
+       //      if page is dirty, then
+       //      sync it to the disk first.
+
+       if( set->latch->dirty )
+        if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
+         return bt->mgr->line = __LINE__, bt->mgr->err;
+        else
+         set->latch->dirty = 0;
+
+       // transfer slots in our selected page to larger btree
+if( !(set->latch->page_no % 100) )
+fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
+
+       for( slot = 0; slot++ < set->page->cnt; ) {
+               ptr = keyptr (set->page, slot);
+               val = valptr (set->page, slot);
+               node = slotptr(set->page, slot);
+
+               switch( node->type ) {
+               case Duplicate:
+               case Unique:
+                 if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
+                       return bt->main->err;
+
+                 continue;
+
+               case Delete:
+                 if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
+                       return bt->main->err;
+
+                 continue;
+               }
+       }
+
+       //  now delete the page
+
+       if( bt_deletepage (bt->mgr, set, bt->thread_no) )
+               return bt->mgr->err;
+
+       bt_unpinlatch (bt->mgr, set->latch);
+       return 0;
+  }
 }
 
 //     set cursor to highest slot on highest page
@@ -3628,14 +3609,14 @@ uint entry = 0;
                if( *latch->access->rin & MASK )
                        fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
 
-               if( *latch->parent->exclusive )
-                       fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
+//             if( *latch->parent->xcl->value )
+//                     fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
 
-               if( *latch->atomic->exclusive )
-                       fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
+//             if( *latch->atomic->xcl->value )
+//                     fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
 
-               if( *latch->modify->exclusive )
-                       fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
+//             if( *latch->modify->value )
+//                     fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
 
                if( latch->pin & ~CLOCK_bit )
                        fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
@@ -3709,7 +3690,7 @@ FILE *in;
                          line++;
 
                          if( !args->num ) {
-                           if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, 1, bt->thread_no) )
+                           if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
                                  fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
                            len = 0;
                                continue;
@@ -3753,7 +3734,7 @@ FILE *in;
                        {
                          line++;
 
-                         if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, 1, bt->thread_no) )
+                         if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
                                fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
                          len = 0;
                        }
@@ -3808,7 +3789,6 @@ FILE *in;
                                cnt++;
                           }
 
-                       set->latch->avail = 1;
                        bt_unlockpage (BtLockRead, set->latch);
                        bt_unpinlatch (bt->mgr, set->latch);
                } while( page_no = next );
@@ -3844,7 +3824,7 @@ FILE *in;
                posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
 #endif
                fprintf(stderr, "started counting\n");
-               page_no = LEAF_page;
+               next = LEAF_page + bt->mgr->redopages + 1;
 
                while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
                        if( bt_readpage (bt->mgr, bt->cursor, page_no) )
@@ -3853,7 +3833,8 @@ FILE *in;
                        if( !bt->cursor->free && !bt->cursor->lvl )
                                cnt += bt->cursor->act;
 
-                       page_no++;
+                       bt->mgr->reads++;
+                       page_no = next++;
                }
                
                cnt--;  // remove stopper key