]> pd.if.org Git - btree/blobdiff - threadskv10g.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv10g.c
index 4e11b5b7f6ec4715e8d7db27eeb337db112a1df0..42dfea23961f4a90a816561c9cd050fa049389e8 100644 (file)
@@ -222,18 +222,15 @@ typedef struct {
 typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
-       uint min;                                       // next key offset
+       uint min;                                       // next key/value offset
+       uint fence;                                     // page fence key offset
        uint garbage;                           // page garbage in bytes
-       unsigned char lvl;                      // level of page
-       unsigned char free;                     // page is on free chain
+       unsigned char lvl;                      // level of page, zero = leaf
+       unsigned char free;                     // page is on the free chain
        unsigned char kill;                     // page is being deleted
        unsigned char nopromote;        // page is being constructed
-       unsigned char filler1[6];       // padding to multiple of 8 bytes
-       unsigned char right[BtId];      // page number to right
-       unsigned char left[BtId];       // page number to left
-       unsigned char filler2[2];       // padding to multiple of 8 bytes
+       uid right, left;                        // page numbers to right and left
        logseqno lsn;                           // log sequence number applied
-       uid page_no;                            // this page number
 } *BtPage;
 
 //  The loadpage interface object
@@ -247,8 +244,8 @@ typedef struct {
 
 typedef struct {
        struct BtPage_ alloc[1];                // next page_no in right ptr
-       unsigned char freechain[BtId];  // head of free page_nos chain
-       unsigned char leafchain[BtId];  // head of leaf page_nos chain
+       uid freechain;                                  // head of free page_nos chain
+       uid leafchain;                                  // head of leaf page_nos chain
        unsigned long long leafpages;   // number of active leaf pages
        unsigned long long upperpages;  // number of active upper pages
        uint redopages;                                 // number of redo pages in file
@@ -278,17 +275,16 @@ typedef struct {
        logseqno lsn, flushlsn;         // current & first lsn flushed
        MutexLatch redo[1];                     // redo area lite latch
        MutexLatch lock[1];                     // allocation area lite latch
-       MutexLatch maps[1];                     // mapping segments lite latch
        ushort thread_no[1];            // highest thread number issued
        ushort err_thread;                      // error thread number
        uint nlatchpage;                        // size of buffer pool & latchsets
        uint latchtotal;                        // number of page latch entries
        uint latchhash;                         // number of latch hash table slots
-       uint latchvictim;                       // next latch entry to examine
+       uint latchvictim;                       // next latch entry to swap out
        uint nleafpage;                         // size of leaf pool & leafsets
        uint leaftotal;                         // number of leaf latch entries
        uint leafhash;                          // number of leaf hash table slots
-       uint leafvictim;                        // next leaf entry to examine
+       uint leafvictim;                        // next leaf entry to swap out
        uint leafpromote;                       // next leaf entry to promote
        uint redopage;                          // page number of redo buffer
        uint redolast;                          // last msync size of recovery buff
@@ -297,12 +293,12 @@ typedef struct {
        int line;                                       // last error line no
        int found;                                      // number of keys found by delete
        int reads, writes;                      // number of reads and writes
+       int type;                                       // type of LSM tree 0=cache, 1=main
 #ifndef unix
        HANDLE halloc;                          // allocation handle
        HANDLE hpool;                           // buffer pool handle
 #endif
-       volatile uint segments;         // number of memory mapped segments
-       unsigned char *pages[64000];// memory mapped segments of b-tree
+       unsigned char *page0;           // memory mapped pagezero of b-tree
 } BtMgr;
 
 typedef struct {
@@ -329,6 +325,7 @@ typedef struct {
        uint entry:31;          // latch table entry number
        uint reuse:1;           // reused previous page
        uint slot;                      // slot on page
+       uint src;                       // source slot
 } AtomicTxn;
 
 //     Catastrophic errors
@@ -396,7 +393,7 @@ extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *v
 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
 
 //     atomic transaction functions
-BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no);
 BTERR bt_promote (BtDb *bt);
 
 //  The page is allocated from low and hi ends.
@@ -448,6 +445,7 @@ BTERR bt_promote (BtDb *bt);
 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
+#define fenceptr(page) ((BtKey*)((unsigned char*)(page) + page->fence))
 
 void bt_putid(unsigned char *dest, uid id)
 {
@@ -600,11 +598,6 @@ cnt2++;
        bt_releasemutex (latch->modify);
   }
 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
-fprintf(stderr, "begin sync");
-       for( segment = 0; segment < mgr->segments; segment++ )
-         if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
-               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
-fprintf(stderr, " end sync\n");
 }
 
 //     recovery manager -- process current recovery buff on startup
@@ -795,71 +788,19 @@ BtVal *val;
        return lsn;
 }
 
-//     sync a single btree page to disk
-
-BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
-{
-uint segment = latch->page_no >> 16;
-uint page_size = mgr->page_size;
-BtPage perm;
-
-       if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
-               return mgr->err;
-
-       if( !page->lvl )
-               page_size <<= mgr->leaf_xtra;
-
-       perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
-
-       if( msync (perm, page_size, MS_SYNC) < 0 )
-         fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
-
-       latch->dirty = 0;
-       return 0;
-}
-
 //     read page into buffer pool from permanent location in Btree file
 
 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
 {
-int flag = PROT_READ | PROT_WRITE;
 uint page_size = mgr->page_size;
-uint off = 0, segment, fragment;
-unsigned char *perm;
 
   if( leaf )
        page_size <<= mgr->leaf_xtra;
 
-  fragment = page_no & 0xffff;
-  segment = page_no >> 16;
-  mgr->reads++;
-
-  while( off < page_size ) {
-       if( fragment >> 16 )
-               segment++, fragment = 0;
-
-       if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
-
-         memcpy ((unsigned char *)page + off, perm, mgr->page_size);
-         off += mgr->page_size;
-         fragment++;
-         continue;
-       }
-
-       bt_mutexlock (mgr->maps);
-
-       if( segment < mgr->segments ) {
-               bt_releasemutex (mgr->maps);
-               continue;
-       }
-
-       mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
-       mgr->segments++;
-
-       bt_releasemutex (mgr->maps);
-  }
+  if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
+       return mgr->err = BTERR_read;
 
+  __sync_fetch_and_add (&mgr->reads, 1);
   return 0;
 }
 
@@ -867,56 +808,28 @@ unsigned char *perm;
 
 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
 {
-int flag = PROT_READ | PROT_WRITE;
 uint page_size = mgr->page_size;
-uint off = 0, segment, fragment;
-unsigned char *perm;
 
   if( leaf )
        page_size <<= mgr->leaf_xtra;
 
-  fragment = page_no & 0xffff;
-  segment = page_no >> 16;
-  mgr->writes++;
-
-  while( off < page_size ) {
-       if( fragment >> 16 )
-               segment++, fragment = 0;
-
-       if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
-         memcpy (perm, (unsigned char *)page + off, mgr->page_size);
-         off += mgr->page_size;
-         fragment++;
-         continue;
-       }
-
-       bt_mutexlock (mgr->maps);
-
-       if( segment < mgr->segments ) {
-               bt_releasemutex (mgr->maps);
-               continue;
-       }
-
-       mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
-       mgr->segments++;
-       bt_releasemutex (mgr->maps);
-  }
+  if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
+       return mgr->err = BTERR_wrt;
 
+  __sync_fetch_and_add (&mgr->writes, 1);
   return 0;
 }
 
 //     set CLOCK bit in latch
 //     decrement pin count
 
-int value;
-
-void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
+void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
 {
        bt_mutexlock(latch->modify);
+       if( dirty )
+               latch->dirty = 1;
        latch->pin |= CLOCK_bit;
        latch->pin--;
-
        bt_releasemutex(latch->modify);
 }
 
@@ -1015,8 +928,8 @@ uint entry;
 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
 {
 uint hashidx = page_no % mgr->leafhash;
+uint entry, oldidx;
 BtLatchSet *latch;
-uint entry, idx;
 BtPage page;
 
   //  try to find our entry
@@ -1050,34 +963,18 @@ trynext:
   entry = bt_availleaf (mgr);
   latch = mgr->leafsets + entry;
 
-  idx = latch->page_no % mgr->leafhash;
+  page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
+  oldidx = latch->page_no % mgr->leafhash;
 
-  //  if latch is on a different hash chain
-  //   unlink from the old page_no chain
+  //  skip over this entry if old latch is not available
 
   if( latch->page_no )
-   if( idx != hashidx ) {
-
-       //  skip over this entry if latch not available
-
-    if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
+   if( oldidx != hashidx )
+    if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
          bt_releasemutex(latch->modify);
          goto trynext;
        }
 
-       if( latch->prev )
-         mgr->leafsets[latch->prev].next = latch->next;
-       else
-         mgr->leaftable[idx].entry = latch->next;
-
-       if( latch->next )
-         mgr->leafsets[latch->next].prev = latch->prev;
-
-    bt_releasemutex (mgr->leaftable[idx].latch);
-   }
-
-  page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
-
   //  update permanent page area in btree from buffer pool
   //   no read-lock is required since page is not pinned.
 
@@ -1087,8 +984,24 @@ trynext:
        else
          latch->dirty = 0;
 
+  //  if latch is on a different hash chain
+  //   unlink from the old page_no chain
+
+  if( latch->page_no )
+   if( oldidx != hashidx ) {
+       if( latch->prev )
+         mgr->leafsets[latch->prev].next = latch->next;
+       else
+         mgr->leaftable[oldidx].entry = latch->next;
+
+       if( latch->next )
+         mgr->leafsets[latch->next].prev = latch->prev;
+
+    bt_releasemutex (mgr->leaftable[oldidx].latch);
+   }
+
   if( bt_readpage (mgr, page, page_no, 1) )
-       return mgr->line = __LINE__, NULL;
+       return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
 
   //  link page as head of hash table chain
   //  if this is a never before used entry,
@@ -1097,7 +1010,7 @@ trynext:
   //  leave it in its current hash table
   //  chain position.
 
-  if( !latch->page_no || hashidx != idx ) {
+  if( !latch->page_no || hashidx != oldidx ) {
        if( latch->next = mgr->leaftable[hashidx].entry )
          mgr->leafsets[latch->next].prev = entry;
 
@@ -1122,8 +1035,8 @@ trynext:
 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
 {
 uint hashidx = page_no % mgr->latchhash;
+uint entry, oldidx;
 BtLatchSet *latch;
-uint entry, idx;
 BtPage page;
 
   //  try to find our entry
@@ -1156,34 +1069,18 @@ trynext:
   entry = bt_availnext (mgr);
   latch = mgr->latchsets + entry;
 
-  idx = latch->page_no % mgr->latchhash;
+  page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
+  oldidx = latch->page_no % mgr->latchhash;
 
-  //  if latch is on a different hash chain
-  //   unlink from the old page_no chain
+  //  skip over this entry if latch not available
 
   if( latch->page_no )
-   if( idx != hashidx ) {
-
-       //  skip over this entry if latch not available
-
-    if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
+   if( oldidx != hashidx )
+    if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
          bt_releasemutex(latch->modify);
          goto trynext;
        }
 
-       if( latch->prev )
-         mgr->latchsets[latch->prev].next = latch->next;
-       else
-         mgr->hashtable[idx].entry = latch->next;
-
-       if( latch->next )
-         mgr->latchsets[latch->next].prev = latch->prev;
-
-    bt_releasemutex (mgr->hashtable[idx].latch);
-   }
-
-  page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
-
   //  update permanent page area in btree from buffer pool
   //   no read-lock is required since page is not pinned.
 
@@ -1193,6 +1090,22 @@ trynext:
        else
          latch->dirty = 0;
 
+  //  if latch is on a different hash chain
+  //   unlink from the old page_no chain
+
+  if( latch->page_no )
+   if( oldidx != hashidx ) {
+       if( latch->prev )
+         mgr->latchsets[latch->prev].next = latch->next;
+       else
+         mgr->hashtable[oldidx].entry = latch->next;
+
+       if( latch->next )
+         mgr->latchsets[latch->next].prev = latch->prev;
+
+    bt_releasemutex (mgr->hashtable[oldidx].latch);
+   }
+
   if( bt_readpage (mgr, page, page_no, 0) )
        return mgr->line = __LINE__, NULL;
 
@@ -1203,7 +1116,7 @@ trynext:
   //  leave it in its current hash table
   //  chain position.
 
-  if( !latch->page_no || hashidx != idx ) {
+  if( !latch->page_no || hashidx != oldidx ) {
        if( latch->next = mgr->hashtable[hashidx].entry )
          mgr->latchsets[latch->next].prev = entry;
 
@@ -1224,11 +1137,12 @@ trynext:
   
 void bt_mgrclose (BtMgr *mgr)
 {
+char *name = mgr->type ? "Main" : "Cache";
 BtLatchSet *latch;
 BtLogHdr *eof;
 uint num = 0;
 BtPage page;
-uint slot;
+uint entry;
 
        //      flush previously written dirty pages
        //      and write recovery buffer to disk
@@ -1242,9 +1156,9 @@ uint slot;
 
        //      write remaining dirty pool pages to the btree
 
-       for( slot = 1; slot < mgr->latchtotal; slot++ ) {
-               page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
-               latch = mgr->latchsets + slot;
+       for( entry = 1; entry < mgr->latchtotal; entry++ ) {
+               page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
+               latch = mgr->latchsets + entry;
 
                if( latch->dirty ) {
                        bt_writepage(mgr, page, latch->page_no, 0);
@@ -1252,11 +1166,16 @@ uint slot;
                }
        }
 
+       if( num )
+         fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
+
+       num = 0;
+
        //      write remaining dirty leaf pages to the btree
 
-       for( slot = 1; slot < mgr->leaftotal; slot++ ) {
-               page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
-               latch = mgr->leafsets + slot;
+       for( entry = 1; entry < mgr->leaftotal; entry++ ) {
+               page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
+               latch = mgr->leafsets + entry;
 
                if( latch->dirty ) {
                        bt_writepage(mgr, page, latch->page_no, 1);
@@ -1264,6 +1183,9 @@ uint slot;
                }
        }
 
+       if( num )
+         fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
+
        //      clear redo recovery buffer on disk.
 
        if( mgr->pagezero->redopages ) {
@@ -1274,11 +1196,8 @@ uint slot;
                  fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
        }
 
-       fprintf(stderr, "%d buffer pool pages flushed\n", num);
-
 #ifdef unix
-       while( mgr->segments )
-               munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
+       munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
 
        munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
        munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
@@ -1294,7 +1213,6 @@ uint slot;
        close (mgr->idx);
        free (mgr);
 #else
-       VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
        FlushFileBuffers(mgr->idx);
        CloseHandle(mgr->idx);
        GlobalFree (mgr);
@@ -1311,7 +1229,7 @@ void bt_close (BtDb *bt)
 //  open/create new btree buffer manager
 
 //     call with file_name, BT_openmode, bits in page size (e.g. 16),
-//             size of page pool (e.g. 262144) and leaf pool
+//             size of page pool (e.g. 300) and leaf pool (e.g. 4000)
 
 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
 {
@@ -1369,6 +1287,7 @@ BtVal *val;
                        if( pagezero->page_bits ) {
                                pagebits = pagezero->page_bits;
                                leafxtra = pagezero->leaf_xtra;
+                               redopages = pagezero->redopages;
                        } else
                                initit = 1;
                else
@@ -1384,6 +1303,7 @@ BtVal *val;
                        return bt_mgrclose (mgr), NULL;
                pagebits = pagezero->page_bits;
                leafxtra = pagezero->leaf_xtra;
+               redopages = pagezero->redopages;
        } else
                initit = 1;
 #endif
@@ -1423,14 +1343,14 @@ BtVal *val;
        pagezero->page_bits = mgr->page_bits;
        pagezero->leaf_xtra = leafxtra;
 
-       bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
+       pagezero->alloc->right = mgr->redopage + pagezero->redopages;
        pagezero->upperpages = 1;
        pagezero->leafpages = 1;
 
        //  initialize left-most LEAF page in
        //      alloc->left and count of active leaf pages.
 
-       bt_putid (pagezero->alloc->left, LEAF_page);
+       pagezero->alloc->left = LEAF_page;
 
        if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
                fprintf (stderr, "Unable to create btree page zero\n");
@@ -1466,7 +1386,6 @@ BtVal *val;
                pagezero->alloc->lvl = lvl;
                pagezero->alloc->cnt = 2;
                pagezero->alloc->act = 1;
-               pagezero->alloc->page_no = MIN_lvl - lvl;
 
                if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
                        fprintf (stderr, "Unable to create btree page\n");
@@ -1484,19 +1403,18 @@ mgrlatch:
        // mlock the first segment of 64K pages
 
        flag = PROT_READ | PROT_WRITE;
-       mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
-       mgr->segments = 1;
+       mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
 
-       if( mgr->pages[0] == MAP_FAILED ) {
-               fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
+       if( mgr->page0 == MAP_FAILED ) {
+               fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
 
-       mgr->pagezero = (BtPageZero *)mgr->pages[0];
+       mgr->pagezero = (BtPageZero *)mgr->page0;
        mlock (mgr->pagezero, mgr->page_size);
 
-       mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
-       mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
+       mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
+       mlock (mgr->redobuff, redopages << mgr->page_bits);
 
        //      allocate pool buffers
 
@@ -1520,6 +1438,11 @@ mgrlatch:
        mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
        mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
 
+       for( idx = 1; idx < mgr->leaftotal; idx++ ) {
+               latch = mgr->leafsets + idx;
+               latch->leaf = 1;
+       }
+
        return mgr;
 }
 
@@ -1620,34 +1543,34 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
 
 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
 {
-uint page_size = mgr->page_size, page_xtra = 0;
-unsigned char *freechain;
+uint page_size = mgr->page_size, leaf_xtra = 0;
+uid *freechain;
 uid page_no;
 
+       //      lock allocation page
+
+       bt_mutexlock(mgr->lock);
+
        if( contents->lvl ) {
-               freechain = mgr->pagezero->freechain;
+               freechain = &mgr->pagezero->freechain;
                mgr->pagezero->upperpages++;
        } else {
-               freechain = mgr->pagezero->leafchain;
+               freechain = &mgr->pagezero->leafchain;
                mgr->pagezero->leafpages++;
-               page_xtra = mgr->leaf_xtra;
-               page_size <<= page_xtra;
+               leaf_xtra = mgr->leaf_xtra;
+               page_size <<= leaf_xtra;
        }
 
-       //      lock allocation page
-
-       bt_mutexlock(mgr->lock);
-
        // use empty chain first
        // else allocate new page
 
-       if( page_no = bt_getid(freechain) ) {
+       if( page_no = *freechain ) {
                if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
                        set->page = bt_mappage (mgr, set->latch);
                else
                        return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
 
-               bt_putid(freechain, bt_getid(set->page->right));
+               *freechain = set->page->right;
 
                //  the page is currently nopromote and this
                //  will keep bt_promote out.
@@ -1655,9 +1578,7 @@ uid page_no;
                //      contents will replace this bit
                //  and pin will keep bt_promote out
 
-               contents->page_no = page_no;
                contents->nopromote = 0;
-               set->latch->dirty = 1;
 
                memcpy (set->page, contents, page_size);
 
@@ -1668,21 +1589,23 @@ uid page_no;
                return 0;
        }
 
-       page_no = bt_getid(mgr->pagezero->alloc->right);
-       bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
+       page_no = mgr->pagezero->alloc->right;
+       mgr->pagezero->alloc->right += 1 << leaf_xtra;
 
        // unlock allocation latch and
        //      extend file into new page.
 
 //     if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
 //       fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
        bt_releasemutex(mgr->lock);
 
        //      keep bt_promote out of this page
        contents->nopromote = 1;
-       contents->page_no = page_no;
+
        if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
                fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
+
        if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
                set->page = bt_mappage (mgr, set->latch);
        else
@@ -1691,7 +1614,6 @@ uid page_no;
        // now pin will keep bt_promote out
 
        set->page->nopromote = 0;
-       set->latch->dirty = 1;
        return 0;
 }
 
@@ -1704,7 +1626,7 @@ uint good = 0;
 
        //        make stopper key an infinite fence value
 
-       if( bt_getid (page->right) )
+       if( page->right )
                higher++;
        else
                good++;
@@ -1735,43 +1657,39 @@ int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint
 {
 uid page_no = ROOT_page, prevpage_no = 0;
 uint drill = 0xff, slot;
-BtLatchSet *prevlatch;
 uint mode, prevmode;
-BtPage prevpage;
+BtPageSet prev[1];
 BtVal *val;
 BtKey *ptr;
 
   //  start at root of btree and drill down
 
   do {
-       // determine lock mode of drill level
-       mode = (drill == lvl) ? lock : BtLockRead; 
-
        if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
          set->page = bt_mappage (mgr, set->latch);
        else
          return 0;
 
-       // obtain access lock using lock chaining with Access mode
-
        if( page_no > ROOT_page )
          bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
 
        //      release & unpin parent or left sibling page
 
        if( prevpage_no ) {
-         bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
-         bt_unpinlatch (prevlatch, thread_no, __LINE__);
+         bt_unlockpage(prevmode, prev->latch, thread_no, __LINE__);
+         bt_unpinlatch (prev->latch, 0, thread_no, __LINE__);
          prevpage_no = 0;
        }
 
        // obtain mode lock using lock coupling through AccessLock
+       // determine lock mode of drill level
 
+       mode = (drill == lvl) ? lock : BtLockRead; 
        bt_lockpage(mode, set->latch, thread_no, __LINE__);
 
        // grab our fence key
 
-       ptr=keyptr(set->page,set->page->cnt);
+       ptr=fenceptr(set->page);
 
        if( set->page->free )
                return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
@@ -1789,21 +1707,20 @@ BtKey *ptr;
 
                if( lock != BtLockRead && drill == lvl ) {
                  bt_unlockpage(mode, set->latch, thread_no, __LINE__);
-                 bt_unpinlatch (set->latch, thread_no, __LINE__);
+                 bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
                  continue;
                }
        }
 
        prevpage_no = set->latch->page_no;
-       prevlatch = set->latch;
-       prevpage = set->page;
        prevmode = mode;
+       *prev = *set;
 
        //  if requested key is beyond our fence,
        //      slide to the right
 
        if( keycmp (ptr, key, len) < 0 )
-         if( page_no = bt_getid(set->page->right) )
+         if( page_no = set->page->right )
                continue;
 
        //  if page is part of a delete operation,
@@ -1811,7 +1728,7 @@ BtKey *ptr;
 
        if( set->page->kill ) {
          bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
-         page_no = bt_getid(set->page->left);
+         page_no = set->page->left;
          bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
          continue;
        }
@@ -1844,7 +1761,7 @@ BtKey *ptr;
 
        //  slide right into next page
 
-       page_no = bt_getid(set->page->right);
+       page_no = set->page->right;
 
   } while( page_no );
 
@@ -1855,30 +1772,29 @@ BtKey *ptr;
 }
 
 //     return page to free list
-//     page must be delete & write locked
+//     page must be delete, link & write locked
 //     and have no keys pointing to it.
 
 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
 {
-unsigned char *freechain;
+uid *freechain;
+
+       //      lock allocation page
+
+       bt_mutexlock (mgr->lock);
 
        if( set->page->lvl ) {
-               freechain = mgr->pagezero->freechain;
+               freechain = &mgr->pagezero->freechain;
                mgr->pagezero->upperpages--;
        } else {
-               freechain = mgr->pagezero->leafchain;
+               freechain = &mgr->pagezero->leafchain;
                mgr->pagezero->leafpages--;
        }
 
-       //      lock allocation page
-
-       bt_mutexlock (mgr->lock);
+       //      store chain link
 
-       //      store chain
-
-       memcpy(set->page->right, freechain, BtId);
-       bt_putid(freechain, set->latch->page_no);
-       set->latch->dirty = 1;
+       set->page->right = *freechain;
+       *freechain = set->latch->page_no;
        set->page->free = 1;
 
 //     if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
@@ -1889,11 +1805,12 @@ unsigned char *freechain;
 
        bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
        bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
+       bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
        bt_releasemutex (mgr->lock);
 }
 
-//     a fence key was deleted from a page
+//     a fence key was deleted from an interiour level page
 //     push new fence value upwards
 
 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
@@ -1905,14 +1822,14 @@ uint idx;
 
        //      remove the old fence value
 
-       ptr = keyptr(set->page, set->page->cnt);
+       ptr = fenceptr(set->page);
        memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
        memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
-       set->latch->dirty = 1;
+       set->page->fence = slotptr(set->page, set->page->cnt)->off;
 
        //  cache new fence value
 
-       ptr = keyptr(set->page, set->page->cnt);
+       ptr = fenceptr(set->page);
        memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
 
        bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
@@ -1934,7 +1851,7 @@ uint idx;
                return mgr->err_thread = thread_no, mgr->err;
 
        bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
-       bt_unpinlatch(set->latch, thread_no, __LINE__);
+       bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
        return 0;
 }
 
@@ -1971,14 +1888,12 @@ uint idx;
        bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
 
        memcpy (root->page, child->page, mgr->page_size);
-       root->latch->dirty = 1;
-
        bt_freepage (mgr, child, thread_no);
 
   } while( root->page->lvl > 1 && root->page->act == 1 );
 
   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
-  bt_unpinlatch (root->latch, thread_no, __LINE__);
+  bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
   return 0;
 }
 
@@ -1991,7 +1906,7 @@ uint idx;
 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
 {
 unsigned char lowerfence[BT_keyarray];
-uint page_size = mgr->page_size;
+uint page_size = mgr->page_size, kill;
 BtPageSet right[1], temp[1];
 unsigned char value[BtId];
 uid page_no, right2;
@@ -2000,15 +1915,15 @@ BtKey *ptr;
        if( !lvl )
                page_size <<= mgr->leaf_xtra;
 
-       //  cache copy of original fence key
-       //      that is being deleted.
+       //  cache original copy of original fence key
+       //      that is going to be deleted.
 
-       ptr = keyptr(set->page, set->page->cnt);
+       ptr = fenceptr(set->page);
        memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
 
-       //      obtain locks on right page
+       //      pin and lock our right page
 
-       page_no = bt_getid(set->page->right);
+       page_no = set->page->right;
 
        if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
                right->page = bt_mappage (mgr, right->latch);
@@ -2017,69 +1932,70 @@ BtKey *ptr;
 
        bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
 
-       if( right->page->kill )
+       if( right->page->kill || set->page->kill )
                return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
-       // pull contents of right peer into our empty page
+       // pull contents of right sibling over our empty page
        //      preserving our left page number, and its right page number.
 
        bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
-       page_no = bt_getid(set->page->left);
+       page_no = set->page->left;
        memcpy (set->page, right->page, page_size);
-       bt_putid (set->page->left, page_no);
+       set->page->left = page_no;
        bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
 
-       set->page->page_no = set->latch->page_no;
-       set->latch->dirty = 1;
-
        //  fix left link from far right page
 
-       if( right2 = bt_getid (right->page->right) ) {
+       if( right2 = set->page->right ) {
          if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
                temp->page = bt_mappage (mgr, temp->latch);
          else
                return 0;
 
+         bt_lockpage (BtLockAccess, temp->latch, thread_no, __LINE__);
       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
-         bt_putid (temp->page->left, set->latch->page_no);
+         temp->page->left = set->latch->page_no;
          bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
-         bt_unpinlatch (temp->latch, thread_no, __LINE__);
+         bt_unlockpage(BtLockAccess, temp->latch, thread_no, __LINE__);
+         bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
        } else if( !lvl ) {     // our page is now rightmost leaf
          bt_mutexlock (mgr->lock);
-         bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
+         mgr->pagezero->alloc->left = set->latch->page_no;
          bt_releasemutex(mgr->lock);
        }
 
-       // mark right page deleted and release lock
+       // mark right page as being deleted and release lock
 
-       right->latch->dirty = 1;
        right->page->kill = 1;
        bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
 
-       // redirect higher key directly to our new node contents
+       // redirect the new higher key directly to our new node
 
-       ptr = keyptr(right->page, right->page->cnt);
+       ptr = fenceptr(set->page);
        bt_putid (value, set->latch->page_no);
 
        if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
          return mgr->err;
 
-       //      delete our orignal fence key in parent
-       //      and unlock our page.
+       //      delete our original fence key in parent
 
        ptr = (BtKey *)lowerfence;
 
        if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
          return mgr->err;
 
-       bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
-
-       //      obtain delete and write locks to right node
+       //  wait for all access to drain away with delete lock,
+       //      then obtain write lock to right node and free it.
 
        bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
        bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
+       bt_lockpage (BtLockLink, right->latch, thread_no, __LINE__);
        bt_freepage (mgr, right, thread_no);
+
+       //      release write lock to our node
+
+       bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
        return 0;
 }
 
@@ -2136,27 +2052,20 @@ BtVal *val;
        //      did we delete a fence key in an upper level?
 
        if( lvl && set->page->act && fence )
-         if( bt_fixfence (mgr, set, lvl, thread_no) )
-               return mgr->err_thread = thread_no, mgr->err;
-         else
-               return 0;
+         return bt_fixfence (mgr, set, lvl, thread_no);
 
        //      do we need to collapse root?
 
-       if( set->latch->page_no == ROOT_page && set->page->act == 1 )
-         if( bt_collapseroot (mgr, set, thread_no) )
-               return mgr->err_thread = thread_no, mgr->err;
-         else
-               return 0;
+       if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
+         return bt_collapseroot (mgr, set, thread_no);
 
        //      delete empty page
 
        if( !set->page->act )
          return bt_deletepage (mgr, set, thread_no, set->page->lvl);
 
-       set->latch->dirty = 1;
        bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
        return 0;
 }
 
@@ -2194,14 +2103,13 @@ BtVal *val;
        // skip page info and set rest of page to zero
 
        memset (page+1, 0, page_size - sizeof(*page));
-       set->latch->dirty = 1;
 
        page->min = page_size;
        page->garbage = 0;
        page->act = 0;
 
        // clean up page first by
-       // removing deleted keys
+       // removing dead keys
 
        while( cnt++ < max ) {
                if( cnt == slot )
@@ -2235,9 +2143,10 @@ BtVal *val;
                slotptr(page, idx)->type = slotptr(frame, cnt)->type;
 
                if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
-                       page->act++;
+                 page->act++;
        }
 
+       page->fence = page->min;
        page->cnt = idx;
        free (frame);
 
@@ -2267,7 +2176,7 @@ BtVal *val;
 
        //      save left page fence key for new root
 
-       ptr = keyptr(root->page, root->page->cnt);
+       ptr = fenceptr(root->page);
        memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
 
        //  Obtain an empty page to use, and copy the current
@@ -2277,13 +2186,13 @@ BtVal *val;
                return mgr->err_thread = thread_no, mgr->err;
 
        left_page_no = left->latch->page_no;
-       bt_unpinlatch (left->latch, thread_no, __LINE__);
+       bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
        free (frame);
 
        //      left link the pages together
 
        page = bt_mappage (mgr, right);
-       bt_putid (page->left, left_page_no);
+       page->left = left_page_no;
 
        // preserve the page info at the bottom
        // of higher keys and set rest to zero
@@ -2300,6 +2209,8 @@ BtVal *val;
        val->len = BtId;
 
        nxt -= 2 + sizeof(BtKey);
+       page->fence = nxt;
+
        slotptr(root->page, 2)->off = nxt;
        ptr = (BtKey *)((unsigned char *)root->page + nxt);
        ptr->len = 2;
@@ -2319,7 +2230,7 @@ BtVal *val;
        slotptr(root->page, 1)->off = nxt;
        memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
        
-       bt_putid(root->page->right, 0);
+       root->page->right = 0;
        root->page->min = nxt;          // reset lowest used offset and key count
        root->page->cnt = 2;
        root->page->act = 2;
@@ -2330,9 +2241,9 @@ BtVal *val;
        // release and unpin root pages
 
        bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
-       bt_unpinlatch (root->latch, thread_no, __LINE__);
+       bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
 
-       bt_unpinlatch (right, thread_no, __LINE__);
+       bt_unpinlatch (right, 1, thread_no, __LINE__);
        return 0;
 }
 
@@ -2391,20 +2302,21 @@ uint prev;
                slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
 
                if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
-                       frame->act++;
+                 frame->act++;
        }
 
+       frame->fence = frame->min;
        frame->cnt = idx;
        frame->lvl = lvl;
 
        // link right node
 
        if( set->latch->page_no > ROOT_page ) {
-               right2 = bt_getid (set->page->right);
-               bt_putid (frame->right, right2);
+               right2 = set->page->right;
+               frame->right = right2;
 
                if( linkleft )
-                       bt_putid (frame->left, set->latch->page_no);
+                       frame->left = set->latch->page_no;
        }
 
        //      get new free page and write higher keys to it.
@@ -2422,12 +2334,12 @@ uint prev;
                return 0;
 
       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
-         bt_putid (temp->page->left, right->latch->page_no);
+         temp->page->left = right->latch->page_no;
          bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
-         bt_unpinlatch (temp->latch, thread_no, __LINE__);
+         bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
         } else if( !lvl ) {    // page is rightmost leaf
          bt_mutexlock (mgr->lock);
-         bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
+         mgr->pagezero->alloc->left = right->latch->page_no;
          bt_releasemutex(mgr->lock);
         }
 
@@ -2435,7 +2347,6 @@ uint prev;
 
        memcpy (frame, set->page, page_size);
        memset (set->page+1, 0, page_size - sizeof(*set->page));
-       set->latch->dirty = 1;
 
        set->page->min = page_size;
        set->page->garbage = 0;
@@ -2470,7 +2381,8 @@ uint prev;
                set->page->act++;
        }
 
-       bt_putid(set->page->right, right->latch->page_no);
+       set->page->right = right->latch->page_no;
+       set->page->fence = set->page->min;
        set->page->cnt = idx;
        free(frame);
 
@@ -2497,29 +2409,29 @@ uid right2;
        if( set->latch->page_no == ROOT_page )
                return bt_splitroot (mgr, set, right, thread_no);
 
-       ptr = keyptr(set->page, set->page->cnt);
+       ptr = fenceptr(set->page);
        memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
 
        page = bt_mappage (mgr, right);
 
-       ptr = keyptr(page, page->cnt);
+       ptr = fenceptr(page);
        memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
 
        //      splice in far right page's left page_no
 
-       if( right2 = bt_getid (page->right) ) {
+       if( right2 = page->right ) {
          if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
                temp->page = bt_mappage (mgr, temp->latch);
          else
                return 0;
 
       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
-         bt_putid (temp->page->left, right->page_no);
+         temp->page->left = right->page_no;
          bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
-         bt_unpinlatch (temp->latch, thread_no, __LINE__);
+         bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
        } else if( !lvl ) {  // right page is far right page
          bt_mutexlock (mgr->lock);
-         bt_putid (mgr->pagezero->alloc->left, right->page_no);
+         mgr->pagezero->alloc->left = right->page_no;
          bt_releasemutex(mgr->lock);
        }
        // insert new fences in their parent pages
@@ -2546,10 +2458,10 @@ uid right2;
                return mgr->err_thread = thread_no, mgr->err;
 
        bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
 
        bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
-       bt_unpinlatch (right, thread_no, __LINE__);
+       bt_unpinlatch (right, 1, thread_no, __LINE__);
        return 0;
 }
 
@@ -2637,7 +2549,6 @@ int rate;
                --idx;
        }
 
-       set->latch->dirty = 1;
        set->page->act++;
 
        //      fill in new slot
@@ -2715,7 +2626,6 @@ BtVal *val;
          node->dead = 0;
 
          set->page->garbage += val->len - vallen;
-         set->latch->dirty = 1;
          val->len = vallen;
          memcpy (val->value, value, vallen);
          goto insxit;
@@ -2749,7 +2659,6 @@ BtVal *val;
   memcpy (val->value, value, vallen);
   val->len = vallen;
 
-  set->latch->dirty = 1;
   set->page->min -= keylen + sizeof(BtKey);
   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
   memcpy (ptr->key, key, keylen);
@@ -2759,23 +2668,23 @@ BtVal *val;
 
 insxit:
   bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
-  bt_unpinlatch (set->latch, thread_no, __LINE__);
+  bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
   return 0;
 }
 
 //     determine actual page where key is located
 //  return slot number
 
-uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
+uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, BtPageSet *set)
 {
-BtKey *key = keyptr(source,src), *ptr;
-uint slot = locks[src].slot;
+BtKey *key = keyptr(source,locks[idx].src), *ptr;
+uint slot = locks[idx].slot;
 uint entry;
 
-       if( locks[src].reuse )
-         entry = locks[src-1].entry;
+       if( locks[idx].reuse )
+         entry = locks[idx-1].entry;
        else
-         entry = locks[src].entry;
+         entry = locks[idx].entry;
 
        if( slot ) {
                set->latch = mgr->leafsets + entry;
@@ -2794,8 +2703,8 @@ uint entry;
                if( slot = bt_findslot(set->page, key->key, key->len) ) {
                  if( slotptr(set->page, slot)->type == Librarian )
                        slot++;
-                 if( locks[src].reuse )
-                       locks[src].entry = entry;
+                 if( locks[idx].reuse )
+                       locks[idx].entry = entry;
                  return slot;
                }
        } while( entry = set->latch->split );
@@ -2804,17 +2713,17 @@ uint entry;
        return 0;
 }
 
-BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
+BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
 {
-BtKey *key = keyptr(source, src);
-BtVal *val = valptr(source, src);
+BtKey *key = keyptr(source, locks[idx].src);
+BtVal *val = valptr(source, locks[idx].src);
 BtLatchSet *latch;
 BtPageSet set[1];
 uint entry, slot;
 
-  while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
+  while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
        if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
-         if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
+         if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,locks[idx].src)->type) )
                return mgr->err_thread = thread_no, mgr->err;
 
          set->page->lsn = lsn;
@@ -2837,7 +2746,7 @@ uint entry, slot;
 
        // clear slot number for atomic page
 
-       locks[src].slot = 0;
+       locks[idx].slot = 0;
   }
 
   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
@@ -2846,42 +2755,70 @@ uint entry, slot;
 //     perform delete from smaller btree
 //  insert a delete slot if not found there
 
-BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
+BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
 {
-BtKey *key = keyptr(source, src);
+BtKey *key = keyptr(source, locks[idx].src);
+BtLatchSet *latch;
+uint slot, entry;
 BtPageSet set[1];
-uint idx, slot;
 BtSlot *node;
 BtKey *ptr;
 BtVal *val;
 
-       if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
-         node = slotptr(set->page, slot);
-         ptr = keyptr(set->page, slot);
-         val = valptr(set->page, slot);
-       } else
-         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
+  while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
+       node = slotptr(set->page, slot);
+       ptr = keyptr(set->page, slot);
+       val = valptr(set->page, slot);
 
-       //  if slot is not found, insert a delete slot
+       //  if slot is not found on cache btree, insert a delete slot
+       //      otherwise ignore the request.
 
        if( keycmp (ptr, key->key, key->len) )
-         if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
-               return mgr->err;
+        if( !mgr->type )
+         if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
+               return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
+         else { //  split page before inserting Delete slot
+               if( entry = bt_splitpage (mgr, set, thread_no, 0) )
+                 latch = mgr->leafsets + entry;
+               else
+             return mgr->err;
+
+               //      splice right page into split chain
+               //      and WriteLock it
+
+               bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
+               latch->split = set->latch->split;
+               set->latch->split = entry;
+
+               // clear slot number for atomic page
+
+               locks[idx].slot = 0;
+               continue;
+         }
+        else
+          return 0;
 
        //      if node is already dead,
        //      ignore the request.
 
-       if( node->dead )
+       if( node->type == Delete || node->dead )
                return 0;
 
-       set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
-       set->latch->dirty = 1;
-       set->page->lsn = lsn;
-       set->page->act--;
+       //  if main LSM btree, delete the slot
+       //      else change to delete type.
+
+       if( mgr->type ) {
+               set->page->act--;
+               node->dead = 1;
+       } else
+               node->type = Delete;
 
-       node->dead = 0;
        __sync_fetch_and_add(&mgr->found, 1);
+       set->page->lsn = lsn;
        return 0;
+  }
+
+  return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
 }
 
 //     release master's splits from right to left
@@ -2895,7 +2832,7 @@ BtLatchSet *latch = mgr->leafsets + entry;
 
        latch->split = 0;
        bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
-       bt_unpinlatch(latch, thread_no, __LINE__);
+       bt_unpinlatch(latch, 1, thread_no, __LINE__);
 }
 
 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
@@ -2943,7 +2880,7 @@ int type;
 
   //  perform the individual actions in the transaction
 
-  if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
+  if( bt_atomicexec (bt->mgr, source, source->cnt, lsn, 0, bt->thread_no) )
        return bt->mgr->err;
 
   // if number of active pages
@@ -2962,9 +2899,9 @@ int type;
 
 //     execute the source list of inserts/deletes
 
-BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no)
 {
-uint slot, src, idx, samepage, entry;
+uint slot, src, idx, samepage, entry, outidx;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtLatchSet *latch;
@@ -2974,24 +2911,29 @@ BtKey *key, *ptr;
 BtPage page;
 BtVal *val;
 
-  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
+  locks = calloc (count, sizeof(AtomicTxn));
+  memset (set, 0, sizeof(BtPageSet));
+  outidx = 0;
 
   // Load the leaf page for each key
   // group same page references with reuse bit
 
-  for( src = 0; src++ < source->cnt; ) {
+  for( src = 0; src++ < count; ) {
+       if( slotptr(source,src)->dead )
+         continue;
+
        key = keyptr(source, src);
 
        // first determine if this modification falls
        // on the same page as the previous modification
        //      note that the far right leaf page is a special case
 
-       if( samepage = src > 1 )
-         samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
+       if( samepage = !!set->page )
+         samepage = !set->page->right || keycmp (ptr, key->key, key->len) >= 0;
 
        if( !samepage )
          if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
-               ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
+               ptr = fenceptr(set->page), set->latch->split = 0;
          else
                return mgr->err;
        else
@@ -3002,22 +2944,23 @@ BtVal *val;
          slot++;
 
        entry = set->latch - mgr->leafsets;
-       locks[src].reuse = samepage;
-       locks[src].entry = entry;
-       locks[src].slot = slot;
+       locks[outidx].reuse = samepage;
+       locks[outidx].entry = entry;
+       locks[outidx].slot = slot;
+       locks[outidx].src = src;
 
        //      capture current lsn for master page
 
-       locks[src].reqlsn = set->page->lsn;
+       locks[outidx++].reqlsn = set->page->lsn;
   }
 
   // insert or delete each key
   // process any splits or merges
   // run through txn list backwards
 
-  samepage = source->cnt + 1;
+  samepage = outidx;
 
-  for( src = source->cnt; src; src-- ) {
+  for( src = outidx; src--; ) {
        if( locks[src].reuse )
          continue;
 
@@ -3026,7 +2969,7 @@ BtVal *val;
        //  the same page
 
        for( idx = src; idx < samepage; idx++ )
-        switch( slotptr(source,idx)->type ) {
+        switch( slotptr(source,locks[idx].src)->type ) {
         case Delete:
          if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
                return mgr->err;
@@ -3062,11 +3005,11 @@ BtVal *val;
          //  note that there are no pointers to it yet
 
          if( !prev->page->act ) {
-               memcpy (set->page->left, prev->page->left, BtId);
+               set->page->left = prev->page->left;
                memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
                bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
+               bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
                prev->latch->split = set->latch->split;
-               prev->latch->dirty = 1;
                bt_freepage (mgr, set, thread_no);
                continue;
          }
@@ -3076,17 +3019,18 @@ BtVal *val;
          // thread has its page number yet.
 
          if( !set->page->act ) {
-               memcpy (prev->page->right, set->page->right, BtId);
+               prev->page->right = set->page->right;
                prev->latch->split = set->latch->split;
 
                bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
+               bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
                bt_freepage (mgr, set, thread_no);
                continue;
          }
 
          //  update prev's fence key
 
-         ptr = keyptr(prev->page,prev->page->cnt);
+         ptr = fenceptr(prev->page);
          bt_putid (value, prev->latch->page_no);
 
          if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
@@ -3094,11 +3038,7 @@ BtVal *val;
 
          // splice in the left link into the split page
 
-         bt_putid (set->page->left, prev->latch->page_no);
-
-         if( lsm )
-               bt_syncpage (mgr, prev->page, prev->latch);
-
+         set->page->left = prev->latch->page_no;
          *prev = *set;
        }
 
@@ -3109,36 +3049,31 @@ BtVal *val;
          //  fix left pointer in master's original (now split)
          //  far right sibling or set rightmost page in page zero
 
-         if( right_page_no = bt_getid (prev->page->right) ) {
+         if( right_page_no = prev->page->right ) {
                if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
                  set->page = bt_mappage (mgr, set->latch);
                else
                  return mgr->err;
 
            bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
-           bt_putid (set->page->left, prev->latch->page_no);
-               set->latch->dirty = 1;
-
+           set->page->left = prev->latch->page_no;
            bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
-               bt_unpinlatch (set->latch, thread_no, __LINE__);
+               bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
          } else {      // prev is rightmost page
            bt_mutexlock (mgr->lock);
-               bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
+               mgr->pagezero->alloc->left = prev->latch->page_no;
            bt_releasemutex(mgr->lock);
          }
 
          //  switch the original fence key from the
          //  master page to the last split page.
 
-         ptr = keyptr(prev->page,prev->page->cnt);
+         ptr = fenceptr(prev->page);
          bt_putid (value, prev->latch->page_no);
 
          if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
                return mgr->err;
 
-         if( lsm )
-               bt_syncpage (mgr, prev->page, prev->latch);
-
          //  unlock and unpin the split pages
 
          bt_atomicrelease (mgr, latch->split, thread_no);
@@ -3147,7 +3082,7 @@ BtVal *val;
 
          latch->split = 0;
          bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
-         bt_unpinlatch(latch, thread_no, __LINE__);
+         bt_unpinlatch(latch, 1, thread_no, __LINE__);
          continue;
        }
 
@@ -3156,11 +3091,7 @@ BtVal *val;
 
        if( prev->page->act ) {
          bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
-
-         if( lsm )
-               bt_syncpage (mgr, prev->page, prev->latch);
-
-         bt_unpinlatch(prev->latch, thread_no, __LINE__);
+         bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
          continue;
        }
 
@@ -3171,6 +3102,16 @@ BtVal *val;
                return mgr->err;
   }
 
+  // delete the slots
+
+  for( idx = 0; idx++ < count; ) {
+       if( slotptr(source,idx)->dead )
+         continue;
+
+       slotptr(source,idx)->dead = 1;
+       source->act--;
+  }
+
   free (locks);
   return 0;
 }
@@ -3217,14 +3158,14 @@ BtVal *val;
 
        // entry has no right sibling
 
-       if( !bt_getid (set->page->right) ) {
+       if( !set->page->right ) {
          bt_releasemutex(set->latch->modify);
          continue;
        }
 
-       // entry interiour node or being killed or constructed
+       // entry is being killed or constructed
 
-       if( set->page->lvl || set->page->nopromote || set->page->kill ) {
+       if( set->page->nopromote || set->page->kill ) {
          bt_releasemutex(set->latch->modify);
          continue;
        }
@@ -3238,10 +3179,11 @@ BtVal *val;
        bt_releasemutex(set->latch->modify);
 
        // transfer slots in our selected page to the main btree
+
 if( !(entry % 100) )
 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
 
-       if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
+       if( bt_atomicexec (bt->main, set->page, set->page->cnt, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
                fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
                return bt->main->err;
        }
@@ -3283,15 +3225,15 @@ uint slot;
        //  or the key doesn't match what's on the page.
 
        if( slot == set->page->cnt )
-         if( !bt_getid (set->page->right) ) {
+         if( !set->page->right ) {
                bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
-               bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
                continue;
        }
 
        if( keycmp (ptr, key, keylen) ) {
                bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
-               bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
                continue;
        }
 
@@ -3318,17 +3260,17 @@ uint slot;
 findxit:
   if( type < 2 ) {
        bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
-       bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+       bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
   }
   return ret;
 }
 
-//     set cursor to highest slot on left-most page
+//     set cursor to highest slot on right-most page
 
 BTERR bt_lastkey (BtDb *bt)
 {
-uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
-uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
+uid cache_page_no = bt->mgr->pagezero->alloc->left;
+uid main_page_no = bt->main->pagezero->alloc->left;
 
        if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
                bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
@@ -3362,14 +3304,14 @@ uid next, us = set->latch->page_no;
          else
                return slot;
 
-       next = bt_getid(set->page->left);
+       next = set->page->left;
 
        if( !next )
                return 0;
 
        do {
          bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
-         bt_unpinlatch (set->latch, thread_no, __LINE__);
+         bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
 
          if( set->latch = bt_pinleaf (mgr, next, thread_no) )
            set->page = bt_mappage (mgr, set->latch);
@@ -3377,7 +3319,7 @@ uid next, us = set->latch->page_no;
            return 0;
 
       bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
-         next = bt_getid (set->page->right);
+         next = set->page->right;
 
        } while( next != us );
 
@@ -3469,15 +3411,15 @@ uid page_no;
        while( slot++ < set->page->cnt )
          if( slotptr(set->page, slot)->dead )
                continue;
-         else if( slot < set->page->cnt || bt_getid (set->page->right) )
+         else if( slot < set->page->cnt || set->page->right )
                return slot;
          else
                return 0;
 
        bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
 
-       if( page_no = bt_getid(set->page->right) )
+       if( page_no = set->page->right )
          if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
                set->page = bt_mappage (mgr, set->latch);
          else
@@ -3540,6 +3482,7 @@ int cmp;
          return 0;
 
        //  main key is larger
+       //      return smaller key
 
        if( cmp < 0 ) {
          bt->phase = 0;
@@ -3591,6 +3534,48 @@ uint slot;
        return 0;
 }
 
+//     flush cache pages to main btree
+
+BTERR bt_flushmain (BtDb *bt)
+{
+uint count, cnt = 0;
+BtPageSet set[1];
+
+  while( bt->mgr->pagezero->leafpages > 0 ) {
+       if( set->latch = bt_pinleaf (bt->mgr, LEAF_page, bt->thread_no) )
+         set->page = bt_mappage (bt->mgr, set->latch);
+       else
+         return bt->mgr->err;
+
+       bt_lockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
+       count = set->page->cnt;
+
+       if( !set->page->right )
+               count--;
+
+if( !(cnt++ % 100) )
+fprintf(stderr, "Promote LEAF_page %d with %d keys\n", cnt, set->page->act);
+
+       if( bt_atomicexec (bt->main, set->page, count, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
+         return bt->mgr->line = bt->main->line, bt->mgr->err = bt->main->err;
+
+       if( set->page->right )
+         if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
+               return bt->mgr->err;
+         else
+               continue;
+
+       bt_unlockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
+       bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
+       return 0;
+  }
+
+  //  leaf page count is off
+
+  bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__;
+  return bt->mgr->err = BTERR_ovflw;
+}
+
 #ifdef STANDALONE
 
 #ifndef unix
@@ -3670,6 +3655,9 @@ uint entry;
        for( entry = 0; ++entry < mgr->latchtotal; ) {
                latch = mgr->latchsets + entry;
 
+               if( latch->leaf )
+                       fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
+
                if( *latch->modify->value )
                        fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
 
@@ -3680,6 +3668,9 @@ uint entry;
        for( entry = 0; ++entry < mgr->leaftotal; ) {
                latch = mgr->leafsets + entry;
 
+               if( !latch->leaf )
+                       fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
+
                if( *latch->modify->value )
                        fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
 
@@ -3713,11 +3704,13 @@ unsigned char key[BT_maxkey];
 unsigned char buff[65536];
 uint nxt = sizeof(buff);
 ThreadArg *args = arg;
-BtPage page, frame;
+uint counts[8][2];
 BtPageSet set[1];
+BtPage page;
 int vallen;
 BtKey *ptr;
 BtVal *val;
+uint size;
 BtDb *bt;
 FILE *in;
 
@@ -3731,6 +3724,15 @@ FILE *in;
 
        switch(ch | 0x20)
        {
+       case 'm':
+         fprintf(stderr, "started flushing cache to main btree\n");
+
+         if( bt->main )
+               if( bt_flushmain(bt) )
+                 fprintf(stderr, "Error %d Line: %d thread: %d\n", bt->mgr->err, bt->mgr->line, bt->thread_no), exit(0);
+
+         break;
+
        case 'd':
                type = Delete;
 
@@ -3772,6 +3774,7 @@ FILE *in;
                          buff[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
                          slotptr(page,cnt)->type = type;
+                         slotptr(page,cnt)->dead = 0;
                          len = 0;
 
                          if( cnt < args->num )
@@ -3856,10 +3859,10 @@ FILE *in;
            }
 
                bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
-               bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
 
                bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
-               bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
 
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
@@ -3893,55 +3896,90 @@ FILE *in;
            }
 
                bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
-               bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
 
                bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
-               bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
 
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
 
        case 'c':
                fprintf(stderr, "started counting LSM cache btree\n");
+               next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
+               memset (counts, 0, sizeof(counts));
                page_no = LEAF_page;
+               size = bt->mgr->page_size << bt->mgr->leaf_xtra;
+               page = malloc(size);
 
-               do {
-                 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
-                       set->page = bt_mappage (bt->mgr, set->latch);
+#ifdef unix
+               posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
+#endif
+               while( page_no < bt->mgr->pagezero->alloc->right ) {
+                 pread(bt->mgr->idx, page, size, page_no << bt->mgr->page_bits);
+                 if( !page->lvl && !page->free ) {
+                   cnt += page->act;
+
+                       for( idx = 0; idx++ < page->cnt; ) {
+                       BtSlot *node = slotptr (page, idx);
+                         counts[node->type][node->dead]++;
+                       }
+                 }
+                 if( next )
+                       page_no = next;
                  else
-                       return 0;
-
-                 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-                 cnt += set->page->act;
-                 page_no = bt_getid (set->page->right);
-                 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
-               } while( page_no );
+                       page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
+                 next = 0;
+               }
                
                cachecnt = --cnt;       // remove stopper key
-               cnt = 0;
+               counts[Unique][0]--;
+
+               fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
+               fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
+               fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
+               fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
+               fprintf(stderr, "total cache keys count: %d\n", cachecnt);
+               free (page);
 
                fprintf(stderr, "started counting LSM main btree\n");
+               next = bt->main->redopage + bt->main->pagezero->redopages;
+               memset (counts, 0, sizeof(counts));
+               size = bt->main->page_size << bt->main->leaf_xtra;
+               page = malloc(size);
                page_no = LEAF_page;
+               cnt = 0;
 
-               do {
-                 if( set->latch = bt_pinleaf (bt->main, page_no, bt->thread_no) )
-                       set->page = bt_mappage (bt->main, set->latch);
+#ifdef unix
+               posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
+#endif
+               while( page_no < bt->main->pagezero->alloc->right ) {
+                 pread(bt->main->idx, page, size, page_no << bt->main->page_bits);
+                 if( !page->lvl && !page->free ) {
+                   cnt += page->act;
+
+                       for( idx = 0; idx++ < page->cnt; ) {
+                       BtSlot *node = slotptr (page, idx);
+                         counts[node->type][node->dead]++;
+                       }
+                 }
+                 if( next )
+                       page_no = next;
                  else
-                       return 0;
-
-                 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-                 cnt += set->page->act;
-                 page_no = bt_getid (set->page->right);
-                 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
-               } while( page_no );
+                       page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
+                 next = 0;
+               }
                
                cnt--;  // remove stopper key
-
-               fprintf(stderr, " cache keys counted %d\n", cachecnt);
-               fprintf(stderr, " main keys counted %d\n", cnt);
-               fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
+               counts[Unique][0]--;
+
+               fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
+               fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
+               fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
+               fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
+               fprintf(stderr, "total main keys count : %d\n", cnt);
+               fprintf(stderr, "Total keys counted    : %d\n", cnt + cachecnt);
+               free (page);
                break;
        }
 
@@ -3985,7 +4023,7 @@ BtKey *ptr;
                fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
                fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
                fprintf (stderr, "  where main_file is the name of the main btree file\n");
-               fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
+               fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort/(c)ount/(m)ainflush, with a one character command for each input src_file. A command can also be given with no input file\n");
                fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
                fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
                fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
@@ -4050,7 +4088,8 @@ BtKey *ptr;
        if( !mgr ) {
                fprintf(stderr, "Index Open Error %s\n", argv[1]);
                exit (1);
-       }
+       } else
+               mgr->type = 0;
 
        if( mainbits ) {
                main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
@@ -4058,7 +4097,8 @@ BtKey *ptr;
                if( !main ) {
                        fprintf(stderr, "Index Open Error %s\n", argv[2]);
                        exit (1);
-               }
+               } else
+                       main->type = 1;
        } else
                main = NULL;
 
@@ -4109,9 +4149,10 @@ BtKey *ptr;
        if( main )
                fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
 
+       bt_mgrclose (mgr);
+
        if( main )
                bt_mgrclose (main);
-       bt_mgrclose (mgr);
 
        elapsed = getCpuTime(0) - start;
        fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
@@ -4121,6 +4162,11 @@ BtKey *ptr;
        fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
 }
 
+BtKey *bt_fence (BtPage page)
+{
+return fenceptr(page);
+}
+
 BtKey *bt_key (BtPage page, uint slot)
 {
 return keyptr(page,slot);