]> pd.if.org Git - btree/blobdiff - threadskv10.c
More progress on the LSM btrees implementation
[btree] / threadskv10.c
index 129434fda0f8b8756ca60492ac578a408c3cb71d..62ee44e3d69fee2b61f67b2cd60044f4b08761a4 100644 (file)
@@ -110,28 +110,6 @@ typedef enum{
        BtLockAtomic = 32
 } BtLock;
 
-//     lite weight spin latch
-
-volatile typedef struct {
-       ushort exclusive:1;
-       ushort pending:1;
-       ushort share:14;
-} BtMutexLatch;
-
-#define XCL 1
-#define PEND 2
-#define BOTH 3
-#define SHARE 4
-
-/*
-// exclusive is set for write access
-
-volatile typedef struct {
-       unsigned char exclusive[1];
-       unsigned char filler;
-} BtMutexLatch;
-*/
-/*
 typedef struct {
   union {
        struct {
@@ -141,7 +119,7 @@ typedef struct {
        uint value[1];
   };
 } BtMutexLatch;
-*/
+
 #define XCL 1
 #define WRT 2
 
@@ -192,7 +170,6 @@ typedef struct {
        WOLock parent[1];               // Posting of fence key in parent
        WOLock atomic[1];               // Atomic update in progress
        uint split;                             // right split page atomic insert
-       uint entry;                             // entry slot in latch table
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
        ushort pin;                             // number of accessing threads
@@ -262,7 +239,7 @@ typedef struct {
 
 //     note that this structure size
 //     must be a multiple of 8 bytes
-//     in order to place dups correctly.
+//     in order to place PageZero correctly.
 
 typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
@@ -291,9 +268,9 @@ typedef struct {
 
 typedef struct {
        struct BtPage_ alloc[1];                // next page_no in right ptr
-       unsigned long long dups[1];             // global duplicate key uniqueifier
        unsigned char freechain[BtId];  // head of free page_nos chain
-       unsigned long long activepages; // number of active pages pages
+       unsigned long long activepages; // number of active pages
+       uint redopages;                                 // number of redo pages in file
 } BtPageZero;
 
 //     The object structure for Btree access
@@ -321,7 +298,6 @@ typedef struct {
        uint latchhash;                         // number of latch hash table slots
        uint latchvictim;                       // next latch entry to examine
        uint latchpromote;                      // next latch entry to promote
-       uint redopages;                         // size of recovery buff in pages
        uint redolast;                          // last msync size of recovery buff
        uint redoend;                           // eof/end element in recovery buff
        int err;                                        // last error
@@ -387,7 +363,7 @@ typedef struct {
 
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
-extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
@@ -479,15 +455,6 @@ int i;
        return id;
 }
 
-uid bt_newdup (BtMgr *mgr)
-{
-#ifdef unix
-       return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
-#else
-       return _InterlockedIncrement64(mgr->pagezero->dups, 1);
-#endif
-}
-
 //     lite weight spin lock Latch Manager
 
 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
@@ -497,46 +464,6 @@ int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *add
 
 void bt_mutexlock(BtMutexLatch *latch)
 {
-ushort prev;
-
-  do {
-#ifdef  unix
-       prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
-#else
-       prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
-#endif
-       if( !(prev & XCL) )
-         if( !(prev & ~BOTH) )
-               return;
-         else
-#ifdef unix
-               __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
-               _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
-#ifdef  unix
-  } while( sched_yield(), 1 );
-#else
-  } while( SwitchToThread(), 1 );
-#endif
-/*
-unsigned char prev;
-
-  do {
-#ifdef  unix
-       prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
-       prev = _InterlockedOr8(latch->exclusive, XCL);
-#endif
-       if( !(prev & XCL) )
-               return;
-#ifdef  unix
-  } while( sched_yield(), 1 );
-#else
-  } while( SwitchToThread(), 1 );
-#endif
-*/
-/*
 BtMutexLatch prev[1];
 uint slept = 0;
 
@@ -556,7 +483,7 @@ uint slept = 0;
 
        sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
        slept = 1;
-  } */
+  }
 }
 
 //     try to obtain write lock
@@ -566,38 +493,6 @@ uint slept = 0;
 
 int bt_mutextry(BtMutexLatch *latch)
 {
-ushort prev;
-
-#ifdef  unix
-       prev = __sync_fetch_and_or((ushort *)latch, XCL);
-#else
-       prev = _InterlockedOr16((ushort *)latch, XCL);
-#endif
-       //      take write access if all bits are clear
-
-       if( !(prev & XCL) )
-         if( !(prev & ~BOTH) )
-               return 1;
-         else
-#ifdef unix
-               __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
-               _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
-       return 0;
-/*
-unsigned char prev;
-
-#ifdef  unix
-       prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
-       prev = _InterlockedOr8(latch->exclusive, XCL);
-#endif
-       //      take write access if all bits are clear
-
-       return !(prev & XCL);
-*/
-/*
 BtMutexLatch prev[1];
 
        *prev->value = __sync_fetch_and_or(latch->value, XCL);
@@ -605,29 +500,18 @@ BtMutexLatch prev[1];
        //      take write access if exclusive bit is clear
 
        return !prev->bits->xlock;
-*/
 }
 
 //     clear write mode
 
 void bt_releasemutex(BtMutexLatch *latch)
 {
-#ifdef unix
-       __sync_fetch_and_and((ushort *)latch, ~BOTH);
-#else
-       _InterlockedAnd16((ushort *)latch, ~BOTH);
-#endif
-/*
-       *latch->exclusive = 0;
-*/
-/*
 BtMutexLatch prev[1];
 
        *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
 
        if( prev->bits->wrt )
          sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-*/
 }
 
 //     Write-Only Queue Lock
@@ -781,9 +665,9 @@ void ReadRelease (RWLock *lock)
 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
 {
 uint cnt3 = 0, cnt2 = 0, cnt = 0;
+uint entry, segment;
 BtLatchSet *latch;
 BtPage page;
-uint entry;
 
   //   flush dirty pool pages to the btree
 
@@ -795,7 +679,7 @@ fprintf(stderr, "Start flushlsn  ");
        bt_lockpage(BtLockRead, latch, thread_no);
 
        if( latch->dirty ) {
-         bt_writepage(mgr, page, latch->page_no, 0);
+         bt_writepage(mgr, page, latch->page_no);
          latch->dirty = 0, cnt++;
        }
 if( latch->pin & ~CLOCK_bit )
@@ -805,7 +689,9 @@ cnt2++;
   }
 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
 fprintf(stderr, "begin sync");
-       sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
+       for( segment = 0; segment < mgr->segments; segment++ )
+         if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
 fprintf(stderr, " end sync\n");
 }
 
@@ -819,8 +705,6 @@ uint offset = 0;
 BtKey *key;
 BtVal *val;
 
-  pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
-
   hdr = (BtLogHdr *)mgr->redobuff;
   mgr->flushlsn = hdr->lsn;
 
@@ -843,11 +727,11 @@ BtVal *val;
 }
 
 //     recovery manager -- append new entry to recovery log
-//     flush to disk when it overflows.
+//     flush dirty pages to disk when it overflows.
 
 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
 {
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
 uint amt = sizeof(BtLogHdr);
 BtLogHdr *hdr, *eof;
 uint last, end;
@@ -862,7 +746,8 @@ uint last, end;
 
        if( amt > size - mgr->redoend ) {
          mgr->flushlsn = mgr->lsn;
-         msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+         if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
          mgr->redolast = 0;
          mgr->redoend = 0;
          bt_flushlsn(mgr, thread_no);
@@ -893,22 +778,25 @@ uint last, end;
        memset (eof, 0, sizeof(BtLogHdr));
        eof->lsn = mgr->lsn;
 
-       last = mgr->redolast & 0xfff;
+       last = mgr->redolast & ~0xfff;
        end = mgr->redoend;
-       mgr->redolast = end;
 
-       bt_releasemutex(mgr->redo);
+       if( end - last + sizeof(BtLogHdr) >= 8192 )
+         if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+         else
+               mgr->redolast = end;
 
-       msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+       bt_releasemutex(mgr->redo);
        return hdr->lsn;
 }
 
 //     recovery manager -- append transaction to recovery log
-//     flush to disk when it overflows.
+//     flush dirty pages to disk when it overflows.
 
 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
 {
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
 uint amt = 0, src, type;
 BtLogHdr *hdr, *eof;
 uint last, end;
@@ -932,7 +820,8 @@ BtVal *val;
 
        if( amt > size - mgr->redoend ) {
          mgr->flushlsn = mgr->lsn;
-         msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+         if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
          mgr->redolast = 0;
          mgr->redoend = 0;
          bt_flushlsn (mgr, thread_no);
@@ -981,12 +870,15 @@ BtVal *val;
        memset (eof, 0, sizeof(BtLogHdr));
        eof->lsn = lsn;
 
-       last = mgr->redolast & 0xfff;
+       last = mgr->redolast & ~0xfff;
        end = mgr->redoend;
-       mgr->redolast = end;
-       bt_releasemutex(mgr->redo);
 
-       msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+       if( end - last + sizeof(BtLogHdr) >= 8192 )
+         if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+               fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+         else
+               mgr->redolast = end;
+
        bt_releasemutex(mgr->redo);
        return lsn;
 }
@@ -995,27 +887,16 @@ BtVal *val;
 
 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
 {
-off64_t off = page_no << mgr->page_bits;
-
-       if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
-               fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
-               return BTERR_read;
-       }
-if( page->page_no != page_no )
-abort();
-       mgr->reads++;
-       return 0;
-/*
 int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
 
   while( 1 ) {
        if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
-               memcpy (page, perm, mgr->page_size);
-if( page->page_no != page_no )
+         perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( perm->page_no != page_no )
 abort();
+               memcpy (page, perm, mgr->page_size);
                mgr->reads++;
                return 0;
        }
@@ -1031,33 +912,24 @@ abort();
        mgr->segments++;
 
        bt_releasemutex (mgr->maps);
-  }  */
+  }
 }
 
 //     write page to permanent location in Btree file
 //     clear the dirty bit
 
-BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
+BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
 {
-off64_t off = page_no << mgr->page_bits;
-
-       if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) {
-               fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno);
-               return BTERR_wrt;
-       }
-       mgr->writes++;
-       return 0;
-/*
 int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
 
   while( 1 ) {
        if( segment < mgr->segments ) {
-         perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+         perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( page_no > LEAF_page && perm->page_no != page_no)
+abort();
                memcpy (perm, page, mgr->page_size);
-               if( syncit )
-                       msync (perm, mgr->page_size, MS_SYNC);
                mgr->writes++;
                return 0;
        }
@@ -1072,7 +944,7 @@ unsigned char *perm;
        mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
        bt_releasemutex (mgr->maps);
        mgr->segments++;
-  } */
+  }
 }
 
 //     set CLOCK bit in latch
@@ -1091,7 +963,8 @@ void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch)
 
 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
 {
-BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
+uid entry = latch - mgr->latchsets;
+BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
 
   return page;
 }
@@ -1207,7 +1080,7 @@ trynext:
   //   no read-lock is required since page is not pinned.
 
   if( latch->dirty )
-       if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
+       if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
          return mgr->line = __LINE__, NULL;
        else
          latch->dirty = 0;
@@ -1216,7 +1089,7 @@ trynext:
        memcpy (page, contents, mgr->page_size);
        latch->dirty = 1;
   } else if( bt_readpage (mgr, page, page_no) )
-         return mgr->line = __LINE__, NULL;
+       return mgr->line = __LINE__, NULL;
 
   //  link page as head of hash table chain
   //  if this is a never before used entry,
@@ -1237,7 +1110,6 @@ trynext:
 
   latch->pin = CLOCK_bit | 1;
   latch->page_no = page_no;
-  latch->entry = entry;
   latch->split = 0;
 
   bt_releasemutex (latch->modify);
@@ -1261,8 +1133,6 @@ uint slot;
        if( mgr->redoend ) {
                eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
                memset (eof, 0, sizeof(BtLogHdr));
-
-               pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
        }
 
        //      write remaining dirty pool pages to the btree
@@ -1272,24 +1142,19 @@ uint slot;
                latch = mgr->latchsets + slot;
 
                if( latch->dirty ) {
-                       bt_writepage(mgr, page, latch->page_no, 0);
+                       bt_writepage(mgr, page, latch->page_no);
                        latch->dirty = 0, num++;
                }
        }
 
-       //      flush last batch to disk and clear
-       //      redo recovery buffer on disk.
-
-       fdatasync (mgr->idx);
+       //      clear redo recovery buffer on disk.
 
-       if( mgr->redopages ) {
+       if( mgr->pagezero->redopages ) {
                eof = (BtLogHdr *)mgr->redobuff;
                memset (eof, 0, sizeof(BtLogHdr));
                eof->lsn = mgr->lsn;
-
-               pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
-
-               sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+               if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
+                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
        }
 
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
@@ -1308,7 +1173,6 @@ uint slot;
        CloseHandle(mgr->hpool);
 #endif
 #ifdef unix
-       free (mgr->redobuff);
        close (mgr->idx);
        free (mgr);
 #else
@@ -1428,17 +1292,18 @@ BtVal *val;
        memset (pagezero, 0, 1 << bits);
        pagezero->alloc->lvl = MIN_lvl - 1;
        pagezero->alloc->bits = mgr->page_bits;
-       bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
+       pagezero->redopages = redopages;
+
+       bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
        pagezero->activepages = 2;
 
        //  initialize left-most LEAF page in
        //      alloc->left and count of active leaf pages.
 
        bt_putid (pagezero->alloc->left, LEAF_page);
+       ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
 
-       ftruncate (mgr->idx, REDO_page << mgr->page_bits);
-
-       if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
+       if( bt_writepage (mgr, pagezero->alloc, 0) ) {
                fprintf (stderr, "Unable to create btree page zero\n");
                return bt_mgrclose (mgr), NULL;
        }
@@ -1465,7 +1330,7 @@ BtVal *val;
                pagezero->alloc->act = 1;
                pagezero->alloc->page_no = MIN_lvl - lvl;
 
-               if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
+               if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
                        fprintf (stderr, "Unable to create btree page\n");
                        return bt_mgrclose (mgr), NULL;
                }
@@ -1478,25 +1343,28 @@ mgrlatch:
        VirtualFree (pagezero, 0, MEM_RELEASE);
 #endif
 #ifdef unix
-       // mlock the pagezero page
+       // mlock the first segment of 64K pages
 
        flag = PROT_READ | PROT_WRITE;
-       mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
-       if( mgr->pagezero == MAP_FAILED ) {
-               fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
+       mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
+       mgr->segments = 1;
+
+       if( mgr->pages[0] == MAP_FAILED ) {
+               fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
+
+       mgr->pagezero = (BtPageZero *)mgr->pages[0];
        mlock (mgr->pagezero, mgr->page_size);
 
+       mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
+       mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
+
        mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
        if( mgr->pagepool == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
-       if( mgr->redopages = redopages ) {
-               ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
-               mgr->redobuff = valloc (redopages * mgr->page_size);
-       }
 #else
        flag = PAGE_READWRITE;
        mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
@@ -1526,8 +1394,6 @@ mgrlatch:
                fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
                return bt_mgrclose (mgr), NULL;
        }
-       if( mgr->redopages = redopages )
-               mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
 #endif
 
        mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
@@ -1663,11 +1529,14 @@ int blk;
                else
                        return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
-               bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
                mgr->pagezero->activepages++;
+               bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
+               if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
 
                bt_releasemutex(mgr->lock);
 
+               contents->page_no = page_no;
                memcpy (set->page, contents, mgr->page_size);
                set->latch->dirty = 1;
                return 0;
@@ -1680,13 +1549,14 @@ int blk;
        //      extend file into new page.
 
        mgr->pagezero->activepages++;
-
-       ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits);
+       if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+         fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
        bt_releasemutex(mgr->lock);
 
-       //      don't load cache from btree page, load it from contents
-
        contents->page_no = page_no;
+       pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
+
+       //      don't load cache from btree page, load it from contents
 
        if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
                set->page = bt_mappage (mgr, set->latch);
@@ -1848,16 +1718,18 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set)
        set->page->free = 1;
 
        // decrement active page count
-       // and unlock allocation page
 
        mgr->pagezero->activepages--;
-       bt_releasemutex (mgr->lock);
+       if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+         fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
 
        // unlock released page
+       // and unlock allocation page
 
        bt_unlockpage (BtLockDelete, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
        bt_unpinlatch (mgr, set->latch);
+       bt_releasemutex (mgr->lock);
 }
 
 //     a fence key was deleted from a page
@@ -1952,6 +1824,9 @@ uint idx;
 //  delete a page and manage keys
 //  call with page writelocked
 
+//     returns with page removed
+//     from the page pool.
+
 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
 {
 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
@@ -1961,6 +1836,7 @@ BtPageSet right[1];
 uid page_no;
 BtKey *ptr;
 
+
        //      cache copy of fence key
        //      to remove in parent
 
@@ -1989,6 +1865,7 @@ BtKey *ptr;
        // pull contents of right peer into our empty page
 
        memcpy (set->page, right->page, mgr->page_size);
+       set->page->page_no = set->latch->page_no;
        set->latch->dirty = 1;
 
        // mark right page deleted and point it to left page
@@ -2028,6 +1905,7 @@ BtKey *ptr;
        bt_freepage (mgr, right);
 
        bt_unlockpage (BtLockParent, set->latch);
+       bt_unpinlatch (mgr, set->latch);
        return 0;
 }
 
@@ -2106,9 +1984,9 @@ BtVal *val;
        } else {
          set->latch->dirty = 1;
          bt_unlockpage(BtLockWrite, set->latch);
+         bt_unpinlatch (mgr, set->latch);
        }
 
-       bt_unpinlatch (mgr, set->latch);
        return 0;
 }
 
@@ -2298,9 +2176,13 @@ uint nxt = mgr->page_size;
 unsigned char value[BtId];
 BtPageSet left[1];
 uid left_page_no;
+BtPage frame;
 BtKey *ptr;
 BtVal *val;
 
+       frame = malloc (mgr->page_size);
+       memcpy (frame, root->page, mgr->page_size);
+
        //      save left page fence key for new root
 
        ptr = keyptr(root->page, root->page->cnt);
@@ -2309,11 +2191,12 @@ BtVal *val;
        //  Obtain an empty page to use, and copy the current
        //  root contents into it, e.g. lower keys
 
-       if( bt_newpage(mgr, left, root->page, page_no) )
+       if( bt_newpage(mgr, left, frame, page_no) )
                return mgr->err;
 
        left_page_no = left->latch->page_no;
        bt_unpinlatch (mgr, left->latch);
+       free (frame);
 
        // preserve the page info at the bottom
        // of higher keys and set rest to zero
@@ -2478,8 +2361,9 @@ uint prev;
        bt_putid(set->page->right, right->latch->page_no);
        set->page->min = nxt;
        set->page->cnt = idx;
+       free(frame);
 
-       return right->latch->entry;
+       return right->latch - mgr->latchsets;
 }
 
 //     fix keys for newly split page
@@ -2848,7 +2732,7 @@ BtVal *val;
        set->page->act--;
 
        node->dead = 0;
-       mgr->found++;
+       __sync_fetch_and_add(&mgr->found, 1);
        return 0;
 }
 
@@ -2985,7 +2869,7 @@ int type;
 
   //  add entries to redo log
 
-  if( bt->mgr->redopages )
+  if( bt->mgr->pagezero->redopages )
    if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
     for( src = 0; src++ < source->cnt; )
         locks[src].lsn = lsn;
@@ -3022,7 +2906,7 @@ int type;
          ptr = keyptr(set->page, slot);
 
        if( !samepage ) {
-         locks[src].entry = set->latch->entry;
+         locks[src].entry = set->latch - bt->mgr->latchsets;
          locks[src].slot = slot;
          locks[src].reuse = 0;
        } else {
@@ -3134,8 +3018,8 @@ int type;
          leaf = calloc (sizeof(AtomicKey), 1), que++;
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+         leaf->entry = prev->latch - bt->mgr->latchsets;
          leaf->page_no = prev->latch->page_no;
-         leaf->entry = prev->latch->entry;
          leaf->type = 0;
 
          if( tail )
@@ -3183,8 +3067,8 @@ int type;
          leaf = calloc (sizeof(AtomicKey), 1), que++;
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+         leaf->entry = prev->latch - bt->mgr->latchsets;
          leaf->page_no = prev->latch->page_no;
-         leaf->entry = prev->latch->entry;
          leaf->type = 0;
   
          if( tail )
@@ -3229,8 +3113,8 @@ int type;
        leaf = calloc (sizeof(AtomicKey), 1), que++;
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+       leaf->entry = prev->latch - bt->mgr->latchsets;
        leaf->page_no = prev->latch->page_no;
-       leaf->entry = prev->latch->entry;
        leaf->nounlock = 1;
        leaf->type = 2;
   
@@ -3289,7 +3173,8 @@ int type;
   // is greater than the buffer pool
   // promote page into larger btree
 
-  while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+  if( bt->main )
+   while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
        if( bt_txnpromote (bt) )
          return bt->mgr->err;
 
@@ -3326,16 +3211,10 @@ BtVal *val;
        if( !bt_mutextry(set->latch->modify) )
                continue;
 
-//    if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
-//             bt_releasemutex(set->latch->modify);
-//             continue;
-//     }
-
        //  skip this entry if it is pinned
 
        if( set->latch->pin & ~CLOCK_bit ) {
          bt_releasemutex(set->latch->modify);
-//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
          continue;
        }
 
@@ -3345,7 +3224,6 @@ BtVal *val;
 
        if( !set->latch->page_no || !bt_getid (set->page->right) ) {
          bt_releasemutex(set->latch->modify);
-//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
          continue;
        }
 
@@ -3357,7 +3235,6 @@ BtVal *val;
 
        if( set->page->lvl || set->page->free || set->page->kill ) {
          bt_releasemutex(set->latch->modify);
-//      bt_releasemutex(bt->mgr->hashtable[idx].latch);
       bt_unlockpage(BtLockWrite, set->latch);
          continue;
        }
@@ -3366,16 +3243,6 @@ BtVal *val;
 
        set->latch->pin++;
        bt_releasemutex(set->latch->modify);
-//    bt_releasemutex(bt->mgr->hashtable[idx].latch);
-
-       //      if page is dirty, then
-       //      sync it to the disk first.
-
-       if( set->latch->dirty )
-        if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
-         return bt->mgr->line = __LINE__, bt->mgr->err;
-        else
-         set->latch->dirty = 0;
 
        // transfer slots in our selected page to larger btree
 if( !(set->latch->page_no % 100) )
@@ -3407,7 +3274,6 @@ fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cn
        if( bt_deletepage (bt->mgr, set, bt->thread_no) )
                return bt->mgr->err;
 
-       bt_unpinlatch (bt->mgr, set->latch);
        return 0;
   }
 }
@@ -3824,7 +3690,7 @@ FILE *in;
                posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
 #endif
                fprintf(stderr, "started counting\n");
-               next = LEAF_page + bt->mgr->redopages + 1;
+               next = REDO_page + bt->mgr->pagezero->redopages;
 
                while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
                        if( bt_readpage (bt->mgr, bt->cursor, page_no) )
@@ -3842,7 +3708,6 @@ FILE *in;
                break;
        }
 
-       fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
        bt_close (bt);
 #ifdef unix
        return NULL;
@@ -3864,8 +3729,8 @@ pthread_t *threads;
 HANDLE *threads;
 #endif
 ThreadArg *args;
+uint redopages = 0;
 uint poolsize = 0;
-uint recovery = 0;
 uint mainpool = 0;
 uint mainbits = 0;
 float elapsed;
@@ -3905,7 +3770,10 @@ BtKey *ptr;
                num = atoi(argv[6]);
 
        if( argc > 7 )
-               recovery = atoi(argv[7]);
+               redopages = atoi(argv[7]);
+
+       if( redopages + REDO_page > 65535 )
+               fprintf (stderr, "Warning: Recovery buffer too large\n");
 
        if( argc > 8 )
                mainbits = atoi(argv[8]);
@@ -3921,19 +3789,22 @@ BtKey *ptr;
 #endif
        args = malloc ((cnt + 1) * sizeof(ThreadArg));
 
-       mgr = bt_mgr (argv[1], bits, poolsize, recovery);
+       mgr = bt_mgr (argv[1], bits, poolsize, redopages);
 
        if( !mgr ) {
                fprintf(stderr, "Index Open Error %s\n", argv[1]);
                exit (1);
        }
 
-       main = bt_mgr (argv[2], mainbits, mainpool, 0);
+       if( mainbits ) {
+               main = bt_mgr (argv[2], mainbits, mainpool, 0);
 
-       if( !main ) {
-               fprintf(stderr, "Index Open Error %s\n", argv[2]);
-               exit (1);
-       }
+               if( !main ) {
+                       fprintf(stderr, "Index Open Error %s\n", argv[2]);
+                       exit (1);
+               }
+       } else
+               main = NULL;
 
        //      fire off threads
 
@@ -3974,9 +3845,14 @@ BtKey *ptr;
                CloseHandle(threads[idx]);
 #endif
        bt_poolaudit(mgr);
-       bt_poolaudit(main);
 
-       bt_mgrclose (main);
+       if( main )
+               bt_poolaudit(main);
+
+       fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+
+       if( main )
+               bt_mgrclose (main);
        bt_mgrclose (mgr);
 
        elapsed = getCpuTime(0) - start;