]> pd.if.org Git - btree/commitdiff
Threadskv9c.c version for durability progress made
authorunknown <karl@E04.petzent.com>
Wed, 8 Oct 2014 19:52:16 +0000 (12:52 -0700)
committerunknown <karl@E04.petzent.com>
Wed, 8 Oct 2014 19:52:16 +0000 (12:52 -0700)
threadskv9c.c [moved from threadskv9.c with 80% similarity]

similarity index 80%
rename from threadskv9.c
rename to threadskv9c.c
index 1894d1f24adfbd2926f88077a73388f686b82b37..6029526f8108525eab3a969e9e0a85253822a323 100644 (file)
@@ -1,4 +1,4 @@
-// btree version threadskv9 sched_yield version
+// btree version threadskv9c FUTEX version
 //     with reworked bt_deletekey code,
 //     phase-fair reader writer lock,
 //     librarian page split code,
@@ -8,7 +8,7 @@
 //     ACID batched key-value updates
 //     and redo log for failure recovery
 
-// 01 OCT 2014
+// 07 OCT 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -34,6 +34,8 @@ REDISTRIBUTION OF THIS SOFTWARE.
 
 #ifdef linux
 #define _GNU_SOURCE
+#include <linux/futex.h>
+#define SYS_futex 202
 #endif
 
 #ifdef unix
@@ -45,6 +47,7 @@ REDISTRIBUTION OF THIS SOFTWARE.
 #include <sys/mman.h>
 #include <errno.h>
 #include <pthread.h>
+#include <limits.h>
 #else
 #define WIN32_LEAN_AND_MEAN
 #include <windows.h>
@@ -117,11 +120,10 @@ typedef struct {
        ushort dup;
 } RWLock;
 
-//     write only queue lock
+//     write only lock
 
 typedef struct {
-       volatile ushort ticket[1];
-       volatile ushort serving[1];
+       volatile uint exclusive[1];
        ushort tid;
        ushort dup;
 } WOLock;
@@ -131,44 +133,46 @@ typedef struct {
 #define MASK 0x3
 #define RINC 0x4
 
-//     definition for spin latch implementation
+//     lite weight mutex
 
 // exclusive is set for write access
-// share is count of read accessors
-// grant write lock when share == 0
 
-volatile typedef struct {
-       ushort exclusive:1;
-       ushort pending:1;
-       ushort share:14;
-} BtSpinLatch;
+typedef struct {
+       volatile uint exclusive[1];
+} BtMutexLatch;
 
 #define XCL 1
-#define PEND 2
-#define BOTH 3
-#define SHARE 4
+
+//     mode & definition for lite latch implementation
+
+enum {
+       QueRd = 1,              // reader queue
+       QueWr = 2               // writer queue
+} RWQueue;
 
 //  hash table entries
 
 typedef struct {
-       volatile uint slot;             // Latch table entry at head of chain
-       BtSpinLatch latch[1];
+       volatile uint entry;            // Latch table entry at head of chain
+       BtMutexLatch latch[1];
 } BtHashEntry;
 
 //     latch manager table structure
 
 typedef struct {
-       uid page_no;                    // latch set page number
-       RWLock readwr[1];               // read/write page lock
-       RWLock access[1];               // Access Intent/Page delete
-       WOLock parent[1];               // Posting of fence key in parent
-       WOLock atomic[1];               // Atomic update in progress
-       uint split;                             // right split page atomic insert
-       uint entry;                             // entry slot in latch table
-       uint next;                              // next entry in hash table chain
-       uint prev;                              // prev entry in hash table chain
-       volatile ushort pin;    // number of outstanding threads
-       ushort dirty:1;                 // page in cache is dirty
+       uid page_no;                                    // latch set page number
+       RWLock readwr[1];                               // read/write page lock
+       RWLock access[1];                               // Access Intent/Page delete
+       WOLock parent[1];                               // Posting of fence key in parent
+       WOLock atomic[1];                               // Atomic update in progress
+       uint split;                                             // right split page atomic insert
+       uint entry;                                             // entry slot in latch table
+       uint next;                                              // next entry in hash table chain
+       uint prev;                                              // prev entry in hash table chain
+       BtMutexLatch modify[1];                 // modify entry lite latch
+       volatile ushort pin;                    // number of accessing threads
+       volatile unsigned char dirty;   // page in cache is dirty (atomic set)
+       volatile unsigned char avail;   // page is an available entry
 } BtLatchSet;
 
 //     Define the length of the page record numbers
@@ -248,7 +252,7 @@ typedef struct BtPage_ {
        unsigned char right[BtId];      // page number to right
        unsigned char left[BtId];       // page number to left
        unsigned char filler[2];        // padding to multiple of 8
-       logseqno lsn;                           // last LogSeqNo applied to page
+       logseqno lsn;                           // log sequence number applied
 } *BtPage;
 
 //  The loadpage interface object
@@ -281,15 +285,18 @@ typedef struct {
        BtLatchSet *latchsets;          // mapped latch set from buffer pool
        unsigned char *pagepool;        // mapped to the buffer pool pages
        unsigned char *redobuff;        // mapped recovery buffer pointer
-       logseqno flushlsn;                      // first lsn flushed w/msync
-       BtSpinLatch redo[1];            // redo area lite latch
-       BtSpinLatch lock[1];            // allocation area lite latch
+       logseqno lsn, flushlsn;         // current & first lsn flushed
+       BtMutexLatch dump[1];           // redo dump lite latch
+       BtMutexLatch redo[1];           // redo area lite latch
+       BtMutexLatch lock[1];           // allocation area lite latch
        ushort thread_no[1];            // next thread number
-       uint latchdeployed;                     // highest number of latch entries deployed
        uint nlatchpage;                        // number of latch pages at BT_latch
        uint latchtotal;                        // number of page latch entries
        uint latchhash;                         // number of latch hash table slots
        uint latchvictim;                       // next latch entry to examine
+       uint latchavail;                        // next available latch entry
+       uint availlock[1];                      // latch available chain commitments
+       uint available;                         // size of latch available chain
        uint redopages;                         // size of recovery buff in pages
        uint redoend;                           // eof/end element in recovery buff
 #ifndef unix
@@ -307,6 +314,7 @@ typedef struct {
        unsigned char key[BT_keyarray]; // last found complete key
        int found;                              // last delete or insert was found
        int err;                                // last error
+       int line;                               // last error line no
        int reads, writes;              // number of reads and writes from the btree
        ushort thread_no;               // thread number
 } BtDb;
@@ -322,7 +330,8 @@ typedef enum {
        BTERR_read,
        BTERR_wrt,
        BTERR_atomic,
-       BTERR_recovery
+       BTERR_recovery,
+       BTERR_avail
 } BTERR;
 
 #define CLOCK_bit 0x8000
@@ -336,14 +345,14 @@ typedef enum {
        BTRM_del,               // delete a key-value from btree
        BTRM_upd,               // update a key with a new value
        BTRM_new,               // allocate a new empty page
-       BTRM_old,               // reuse an old empty page
-       BTRM_end = 255  // circular buffer inter-gap
+       BTRM_old                // reuse an old empty page
 } BTRM;
 
 // recovery manager entry
 //     structure followed by BtKey & BtVal
 
 typedef struct {
+       logseqno reqlsn;        // log sequence number required
        logseqno lsn;           // log sequence number for entry
        uint len;                       // length of entry
        unsigned char type;     // type of entry
@@ -354,7 +363,8 @@ typedef struct {
 
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr);
-extern void bt_flushlsn (BtDb *bt);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
+extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
 extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
 extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
@@ -368,6 +378,7 @@ extern uint bt_nextkey   (BtDb *bt, uint slot);
 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
 extern void bt_mgrclose (BtMgr *mgr);
 extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
+extern logseqno bt_txnredo (BtDb *bt, BtPage page);
 
 //  Helper functions to return slot values
 //     from the cursor page.
@@ -457,26 +468,29 @@ uid bt_newdup (BtDb *bt)
 
 void WriteOLock (WOLock *lock, ushort tid)
 {
-ushort tix;
+uint prev;
 
        if( lock->tid == tid ) {
                lock->dup++;
                return;
        }
+
+       while( 1 ) {
 #ifdef unix
-       tix = __sync_fetch_and_add (lock->ticket, 1);
+         prev = __sync_fetch_and_or (lock->exclusive, 1);
 #else
-       tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
+         prev = _InterlockedExchangeOr (lock->exclusive, 1);
 #endif
-       // wait for our ticket to come up
-
-       while( tix != lock->serving[0] )
+         if( !(prev & XCL) ) {
+               lock->tid = tid;
+               return;
+         }
 #ifdef unix
-               sched_yield();
+         sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
 #else
-               SwitchToThread ();
+         SwitchToThread ();
 #endif
-       lock->tid = tid;
+       }
 }
 
 void WriteORelease (WOLock *lock)
@@ -486,8 +500,11 @@ void WriteORelease (WOLock *lock)
                return;
        }
 
+       *lock->exclusive = 0;
        lock->tid = 0;
-       lock->serving[0]++;
+#ifdef linux
+       sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+#endif
 }
 
 //     Phase-Fair reader/writer lock implementation
@@ -545,6 +562,37 @@ void WriteRelease (RWLock *lock)
        lock->serving[0]++;
 }
 
+//     try to obtain read lock
+//  return 1 if successful
+
+int ReadTry (RWLock *lock, ushort tid)
+{
+ushort w;
+
+       //  OK if write lock already held by same thread
+
+       if( lock->tid == tid ) {
+               lock->dup++;
+               return 1;
+       }
+#ifdef unix
+       w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
+#else
+       w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
+#endif
+       if( w )
+         if( w == (*lock->rin & MASK) ) {
+#ifdef unix
+               __sync_fetch_and_add (lock->rin, -RINC);
+#else
+               _InterlockedExchangeAdd16 (lock->rin, -RINC);
+#endif
+               return 0;
+         }
+
+       return 1;
+}
+
 void ReadLock (RWLock *lock, ushort tid)
 {
 ushort w;
@@ -580,63 +628,31 @@ void ReadRelease (RWLock *lock)
 #endif
 }
 
-//     Spin Latch Manager
-
-//     wait until write lock mode is clear
-//     and add 1 to the share count
+//     lite weight spin lock Latch Manager
 
-void bt_spinreadlock(BtSpinLatch *latch)
+int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
 {
-ushort prev;
-
-  do {
-#ifdef unix
-       prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
-#else
-       prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
-#endif
-       //  see if exclusive request is granted or pending
-
-       if( !(prev & BOTH) )
-               return;
-#ifdef unix
-       prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
-#else
-       prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
-#endif
-#ifdef  unix
-  } while( sched_yield(), 1 );
-#else
-  } while( SwitchToThread(), 1 );
-#endif
+       return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
 }
 
-//     wait for other read and write latches to relinquish
-
-void bt_spinwritelock(BtSpinLatch *latch)
+void bt_mutexlock(BtMutexLatch *latch)
 {
-ushort prev;
+uint prev;
 
-  do {
+  while( 1 ) {
 #ifdef  unix
-       prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
+       prev = __sync_fetch_and_or(latch->exclusive, XCL);
 #else
-       prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
+       prev = _InterlockedOr(latch->exclusive, XCL);
 #endif
        if( !(prev & XCL) )
-         if( !(prev & ~BOTH) )
                return;
-         else
-#ifdef unix
-               __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
-               _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
 #ifdef  unix
-  } while( sched_yield(), 1 );
+       sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
 #else
-  } while( SwitchToThread(), 1 );
+       SwitchToThread();
 #endif
+  }
 }
 
 //     try to obtain write lock
@@ -644,69 +660,122 @@ ushort prev;
 //     return 1 if obtained,
 //             0 otherwise
 
-int bt_spinwritetry(BtSpinLatch *latch)
+int bt_mutextry(BtMutexLatch *latch)
 {
-ushort prev;
+uint prev;
 
 #ifdef  unix
-       prev = __sync_fetch_and_or((ushort *)latch, XCL);
+       prev = __sync_fetch_and_or(latch->exclusive, XCL);
 #else
-       prev = _InterlockedOr16((ushort *)latch, XCL);
+       prev = _InterlockedOr(latch->exclusive, XCL);
 #endif
-       //      take write access if all bits are clear
+       //      take write access if exclusive bit is clear
 
-       if( !(prev & XCL) )
-         if( !(prev & ~BOTH) )
-               return 1;
-         else
-#ifdef unix
-               __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
-               _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
-       return 0;
+       return !(prev & XCL);
 }
 
 //     clear write mode
 
-void bt_spinreleasewrite(BtSpinLatch *latch)
+void bt_releasemutex(BtMutexLatch *latch)
 {
+       *latch->exclusive = 0;
 #ifdef unix
-       __sync_fetch_and_and((ushort *)latch, ~BOTH);
-#else
-       _InterlockedAnd16((ushort *)latch, ~BOTH);
+       sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
 #endif
 }
 
-//     decrement reader count
+//     recovery manager -- flush dirty pages
 
-void bt_spinreleaseread(BtSpinLatch *latch)
+void bt_flushlsn (BtDb *bt)
 {
-#ifdef unix
-       __sync_fetch_and_add((ushort *)latch, -SHARE);
-#else
-       _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
-#endif
+uint cnt3 = 0, cnt2 = 0, cnt = 0;
+BtLatchSet *latch;
+BtPage page;
+uint entry;
+
+  //   flush dirty pool pages to the btree that were
+  //   dirty before the start of this redo recovery buffer
+fprintf(stderr, "Start flushlsn\n");
+  for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
+       page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
+       latch = bt->mgr->latchsets + entry;
+       bt_mutexlock (latch->modify);
+       bt_lockpage(bt, BtLockRead, latch);
+
+       if( latch->dirty ) {
+         bt_writepage(bt->mgr, page, latch->page_no);
+         latch->dirty = 0, bt->writes++, cnt++;
+       }
+if( latch->avail )
+cnt3++;
+if( latch->pin & ~CLOCK_bit )
+cnt2++;
+       bt_unlockpage(bt, BtLockRead, latch);
+       bt_releasemutex (latch->modify);
+  }
+fprintf(stderr, "End flushlsn %d pages  %d pinned  %d available\n", cnt, cnt2, cnt3);
+}
+
+//     recovery manager -- process current recovery buff on startup
+//     this won't do much if previous session was properly closed.
+
+BTERR bt_recoveryredo (BtMgr *mgr)
+{
+BtLogHdr *hdr, *eof;
+uint offset = 0;
+BtKey *key;
+BtVal *val;
+
+  pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
+
+  hdr = (BtLogHdr *)mgr->redobuff;
+  mgr->flushlsn = hdr->lsn;
+
+  while( 1 ) {
+       hdr = (BtLogHdr *)(mgr->redobuff + offset);
+       switch( hdr->type ) {
+       case BTRM_eof:
+               mgr->lsn = hdr->lsn;
+               return 0;
+       case BTRM_add:          // add a unique key-value to btree
+       
+       case BTRM_dup:          // add a duplicate key-value to btree
+       case BTRM_del:          // delete a key-value from btree
+       case BTRM_upd:          // update a key with a new value
+       case BTRM_new:          // allocate a new empty page
+       case BTRM_old:          // reuse an old empty page
+               return 0;
+       }
+  }
 }
 
-//     recovery manager -- dump current recovery buff & flush
+//     recovery manager -- dump current recovery buff & flush dirty pages
+//             in preparation for next recovery buffer.
 
 BTERR bt_dumpredo (BtDb *bt)
 {
 BtLogHdr *eof;
+fprintf(stderr, "Flush pages ");
 
        eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
        memset (eof, 0, sizeof(BtLogHdr));
 
-       pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
-
        //  flush pages written at beginning of this redo buffer
-       //      along with the redo buffer out to disk
+       //      then write the redo buffer out to disk
 
        fdatasync (bt->mgr->idx);
 
-       bt->mgr->flushlsn = bt->mgr->pagezero->alloc->lsn;
+fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
+       pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
+
+       sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+
+       bt->mgr->flushlsn = bt->mgr->lsn;
        bt->mgr->redoend = 0;
+
+       eof = (BtLogHdr *)(bt->mgr->redobuff);
+       memset (eof, 0, sizeof(BtLogHdr));
+       eof->lsn = bt->mgr->lsn;
        return 0;
 }
 
@@ -720,7 +789,7 @@ uint amt = sizeof(BtLogHdr);
 BtLogHdr *hdr, *eof;
 uint flush;
 
-       bt_spinwritelock (bt->mgr->redo);
+       bt_mutexlock (bt->mgr->redo);
 
        if( key )
          amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
@@ -728,9 +797,12 @@ uint flush;
        //      see if new entry fits in buffer
        //      flush and reset if it doesn't
 
-       if( flush = amt > size - bt->mgr->redoend )
+       if( flush = amt > size - bt->mgr->redoend ) {
+         bt_mutexlock (bt->mgr->dump);
+
          if( bt_dumpredo (bt) )
                return 0;
+       }
 
        //      fill in new entry & either eof or end block
 
@@ -739,7 +811,7 @@ uint flush;
        hdr->len = amt;
        hdr->type = type;
        hdr->lvl = lvl;
-       hdr->lsn = ++bt->mgr->pagezero->alloc->lsn;
+       hdr->lsn = ++bt->mgr->lsn;
 
        bt->mgr->redoend += amt;
 
@@ -753,14 +825,105 @@ uint flush;
          memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
        }
 
-       bt_spinreleasewrite(bt->mgr->redo);
+       bt_releasemutex(bt->mgr->redo);
 
-       if( flush )
+       if( flush ) {
          bt_flushlsn (bt);
+         bt_releasemutex(bt->mgr->dump);
+       }
 
        return hdr->lsn;
 }
 
+//     recovery manager -- append transaction to recovery log
+//     flush to disk when it overflows.
+
+logseqno bt_txnredo (BtDb *bt, BtPage source)
+{
+uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
+uint amt = 0, src, type;
+BtLogHdr *hdr, *eof;
+logseqno lsn;
+uint flush;
+BtKey *key;
+BtVal *val;
+
+       //      determine amount of redo recovery log space required
+
+       for( src = 0; src++ < source->cnt; ) {
+         key = keyptr(source,src);
+         val = valptr(source,src);
+         amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
+         amt += sizeof(BtLogHdr);
+       }
+
+       bt_mutexlock (bt->mgr->redo);
+
+       //      see if new entry fits in buffer
+       //      flush and reset if it doesn't
+
+       if( flush = amt > size - bt->mgr->redoend ) {
+         bt_mutexlock (bt->mgr->dump);
+
+         if( bt_dumpredo (bt) )
+               return 0;
+       }
+
+       //      assign new lsn to transaction
+
+       lsn = ++bt->mgr->lsn;
+
+       //      fill in new entries
+
+       for( src = 0; src++ < source->cnt; ) {
+         key = keyptr(source, src);
+         val = valptr(source, src);
+
+         switch( slotptr(source, src)->type ) {
+         case Unique:
+               type = BTRM_add;
+               break;
+         case Duplicate:
+               type = BTRM_dup;
+               break;
+         case Delete:
+               type = BTRM_del;
+               break;
+         case Update:
+               type = BTRM_upd;
+               break;
+         }
+
+         amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
+         amt += sizeof(BtLogHdr);
+
+         hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
+         hdr->len = amt;
+         hdr->type = type;
+         hdr->lsn = lsn;
+         hdr->lvl = 0;
+
+         //  fill in key and value
+
+         memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
+         memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
+
+         bt->mgr->redoend += amt;
+       }
+
+       eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
+       memset (eof, 0, sizeof(BtLogHdr));
+
+       bt_releasemutex(bt->mgr->redo);
+
+       if( flush ) {
+         bt_flushlsn (bt);
+         bt_releasemutex(bt->mgr->dump);
+       }
+
+       return lsn;
+}
+
 //     read page into buffer pool from permanent location in Btree file
 
 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
@@ -769,7 +932,7 @@ off64_t off = page_no << mgr->page_bits;
 
 #ifdef unix
        if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
-               fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
+               fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
                return BTERR_read;
        }
 #else
@@ -781,7 +944,7 @@ uint amt[1];
        ovl->OffsetHigh = off >> 32;
 
        if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
-               fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
+               fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
                return BTERR_read;
        }
        if( *amt <  mgr->page_size ) {
@@ -819,46 +982,15 @@ uint amt[1];
        return 0;
 }
 
-//     link latch table entry into head of latch hash table
-
-BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
-{
-BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
-BtLatchSet *latch = bt->mgr->latchsets + slot;
-
-       if( latch->next = bt->mgr->hashtable[hashidx].slot )
-               bt->mgr->latchsets[latch->next].prev = slot;
-
-       bt->mgr->hashtable[hashidx].slot = slot;
-       latch->page_no = page_no;
-       latch->entry = slot;
-       latch->split = 0;
-       latch->prev = 0;
-       latch->pin = 1;
-
-       if( loadit )
-         if( bt->err = bt_readpage (bt->mgr, page, page_no) )
-               return bt->err;
-         else
-               bt->reads++;
-
-       return bt->err = 0;
-}
-
 //     set CLOCK bit in latch
 //     decrement pin count
 
 void bt_unpinlatch (BtLatchSet *latch)
 {
-#ifdef unix
-       if( ~latch->pin & CLOCK_bit )
-               __sync_fetch_and_or(&latch->pin, CLOCK_bit);
-       __sync_fetch_and_add(&latch->pin, -1);
-#else
-       if( ~latch->pin & CLOCK_bit )
-               _InterlockedOr16 (&latch->pin, CLOCK_bit);
-       _InterlockedDecrement16 (&latch->pin);
-#endif
+       bt_mutexlock(latch->modify);
+       latch->pin |= CLOCK_bit;
+       latch->pin--;
+       bt_releasemutex(latch->modify);
 }
 
 //  return the btree cached page address
@@ -870,203 +1002,294 @@ BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
 {
 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
 
-  if( latch->dirty )
-       if( page->lsn < bt->mgr->flushlsn )
-         if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
-               return NULL;
-         else
-               latch->dirty = 0, bt->writes++;
-
   return page;
 }
 
-//     find existing latchset or inspire new one
-//     return with latchset pinned
+//  return next available latch entry
+//       and with latch entry locked
 
-BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
+uint bt_availnext (BtDb *bt)
 {
-uint hashidx = page_no % bt->mgr->latchhash;
 BtLatchSet *latch;
-uint attempts = 0;
-uint slot, idx;
-uint lvl, cnt;
-off64_t off;
-uint amt[1];
-BtPage page;
+uint entry;
 
-  //  try to find our entry
+  while( 1 ) {
+#ifdef unix
+       entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
+#else
+       entry = _InterlockedIncrement (&bt->mgr->latchavail);
+#endif
+       entry %= bt->mgr->latchtotal;
 
-  bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
+       if( !entry )
+               continue;
 
-  if( slot = bt->mgr->hashtable[hashidx].slot ) do
-  {
-       latch = bt->mgr->latchsets + slot;
-       if( page_no == latch->page_no )
-               break;
-  } while( slot = latch->next );
+       latch = bt->mgr->latchsets + entry;
 
-  //  found our entry
-  //  increment clock
+       if( !latch->avail )
+               continue;
 
-  if( slot ) {
-       latch = bt->mgr->latchsets + slot;
-#ifdef unix
-       __sync_fetch_and_add(&latch->pin, 1);
-#else
-       _InterlockedIncrement16 (&latch->pin);
-#endif
-       bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
-       return latch;
+       bt_mutexlock(latch->modify);
+
+       if( !latch->avail ) {
+               bt_releasemutex(latch->modify);
+               continue;
+       }
+
+       return entry;
   }
+}
 
-       //  see if there are any unused pool entries
-#ifdef unix
-       slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
-#else
-       slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
-#endif
+//     find and add the next available latch entry
+//     to the queue
 
-       if( slot < bt->mgr->latchtotal ) {
-               latch = bt->mgr->latchsets + slot;
-               if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
-                       return NULL;
-               bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
-               return latch;
-       }
+BTERR bt_availlatch (BtDb *bt)
+{
+BtLatchSet *latch;
+uint startattempt;
+uint cnt, entry;
+uint hashidx;
+BtPage page;
 
-#ifdef unix
-       __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
-#else
-       _InterlockedDecrement (&bt->mgr->latchdeployed);
-#endif
   //  find and reuse previous entry on victim
 
+  startattempt = bt->mgr->latchvictim;
+
   while( 1 ) {
 #ifdef unix
-       slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
+       entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
 #else
-       slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
+       entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
 #endif
-       // try to get write lock on hash chain
-       //      skip entry if not obtained
-       //      or has outstanding pins
+       //      skip entry if it has outstanding pins
 
-       slot %= bt->mgr->latchtotal;
+       entry %= bt->mgr->latchtotal;
 
-       if( !slot )
+       if( !entry )
                continue;
 
-       //      only go around two times before
+       //      only go around one time before
        //      flushing redo recovery buffer,
-       //      and the buffer pool.
+       //      and the buffer pool to free up entries.
 
        if( bt->mgr->redopages )
-         if( attempts++ > 2 * bt->mgr->latchtotal ) {
-               if( bt_dumpredo (bt) )
-                 return NULL;
-               bt_flushlsn (bt);
-               attempts = 0;
+         if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
+           if( bt_mutextry (bt->mgr->dump) ) {
+                if( bt_dumpredo (bt) )
+                 return bt->err;
+                bt_flushlsn (bt);
+               // synchronize the various threads running into this condition
+               //      so that only one thread does the dump and flush
+               } else
+                       bt_mutexlock(bt->mgr->dump);
+
+               startattempt = bt->mgr->latchvictim;
+               bt_releasemutex(bt->mgr->dump);
          }
 
-       latch = bt->mgr->latchsets + slot;
-       idx = latch->page_no % bt->mgr->latchhash;
+       latch = bt->mgr->latchsets + entry;
 
-       //      see we are on same chain as hashidx
+       if( latch->avail )
+         continue;
 
-       if( idx == hashidx )
-               continue;
+       bt_mutexlock(latch->modify);
 
-       if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
-               continue;
+       //      skip if already an available entry
+
+       if( latch->avail ) {
+         bt_releasemutex(latch->modify);
+         continue;
+       }
 
-       //  skip this slot if it is pinned
-       //      or the CLOCK bit is set
+       //  skip this entry if it is pinned
+       //      if the CLOCK bit is set
+       //      reset it to zero.
 
        if( latch->pin ) {
-         if( latch->pin & CLOCK_bit ) {
-#ifdef unix
-               __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
-#else
-               _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
-#endif
-         }
-         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
+         latch->pin &= ~CLOCK_bit;
+         bt_releasemutex(latch->modify);
          continue;
        }
 
-       page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
+       page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
 
        //      if dirty page has lsn >= last redo recovery buffer
-       //      then hold page in the buffer pool until redo
-       //      recovery buffer is written to disk.
+       //      then hold page in the buffer pool until next redo
+       //      recovery buffer is being written to disk.
 
        if( latch->dirty )
          if( page->lsn >= bt->mgr->flushlsn ) {
-               bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
+           bt_releasemutex(latch->modify);
                continue;
          }
-               
-       //  update permanent page area in btree from buffer pool
-       //      no read-lock is required since page is not pinned.
 
-       if( latch->dirty )
-         if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
-               return NULL;
-         else
-               latch->dirty = 0, bt->writes++;
-
-       //  unlink our available slot from its hash chain
+       //      entry is available
+#ifdef unix
+       __sync_fetch_and_add (&bt->mgr->available, 1);
+#else
+       _InterlockedIncrement(&bt->mgr->available);
+#endif
+       latch->avail = 1;
+       bt_releasemutex(latch->modify);
+       return 0;
+  }
+}
 
-       if( latch->prev )
-               bt->mgr->latchsets[latch->prev].next = latch->next;
-       else
-               bt->mgr->hashtable[idx].slot = latch->next;
+//     release available latch requests
 
-       if( latch->next )
-               bt->mgr->latchsets[latch->next].prev = latch->prev;
+void bt_availrelease (BtDb *bt, uint count)
+{
+#ifdef unix
+       __sync_fetch_and_add(bt->mgr->availlock, -count);
+#else
+       _InterlockedAdd(bt->mgr->availlock, -count);
+#endif
+}
 
-       bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
+//     commit available chain entries
+//     find available entries as required
 
-       if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
-               return NULL;
+void bt_availrequest (BtDb *bt, uint count)
+{
+#ifdef unix
+       __sync_fetch_and_add(bt->mgr->availlock, count);
+#else
+       _InterlockedAdd(bt->mgr->availlock, count);
+#endif
 
-       bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
-       return latch;
-  }
+       while( *bt->mgr->availlock > bt->mgr->available )
+               bt_availlatch (bt);
 }
 
-// flush pages
+//     find available latchset
+//     return with latchset pinned
 
-void bt_flushlsn (BtDb *bt)
+BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
 {
+uint hashidx = page_no % bt->mgr->latchhash;
 BtLatchSet *latch;
-uint hashidx;
-uint num = 0;
+uint entry, idx;
 BtPage page;
-uint slot;
 
-       //      flush dirty pool pages to the btree that were
-       //      dirty before the start of this redo recovery buffer
+  //  try to find our entry
 
-       for( slot = 1; slot <= bt->mgr->latchdeployed; slot++ ) {
-               page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
-               latch = bt->mgr->latchsets + slot;
-//             hashidx = latch->page_no % bt->mgr->latchhash;
+  bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
 
-//             if( !bt_spinwritetry (bt->mgr->hashtable[hashidx].latch) )
-//                     continue;
+  if( entry = bt->mgr->hashtable[hashidx].entry ) do
+  {
+       latch = bt->mgr->latchsets + entry;
+       if( page_no == latch->page_no )
+               break;
+  } while( entry = latch->next );
 
-               if( latch->dirty ) {
-                 bt_lockpage(bt, BtLockRead, latch);
-                 bt_writepage(bt->mgr, page, latch->page_no);
-                 latch->dirty = 0, bt->writes++;
-                 bt_unlockpage(bt, BtLockRead, latch);
-               }
+  //  found our entry: increment pin
+  //  remove from available status
 
-//         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
+  if( entry ) {
+       latch = bt->mgr->latchsets + entry;
+       bt_mutexlock(latch->modify);
+       if( latch->avail )
+#ifdef unix
+         __sync_fetch_and_add (&bt->mgr->available, -1);
+#else
+         _InterlockedDecrement(&bt->mgr->available);
+#endif
+       latch->avail = 0;
+       latch->pin |= CLOCK_bit;
+       latch->pin++;
+
+       bt_releasemutex(latch->modify);
+       bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
+       return latch;
+  }
+
+  //  find and reuse entry from available set
+
+trynext:
+
+  if( entry = bt_availnext (bt) )
+       latch = bt->mgr->latchsets + entry;
+  else
+       return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
+
+  idx = latch->page_no % bt->mgr->latchhash;
+
+  //  if latch is on a different hash chain
+  //   unlink from the old page_no chain
+
+  if( latch->page_no )
+   if( idx != hashidx ) {
+
+       //  skip over this entry if latch not available
+
+    if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
+         bt_releasemutex(latch->modify);
+         goto trynext;
        }
-}
 
+       if( latch->prev )
+         bt->mgr->latchsets[latch->prev].next = latch->next;
+       else
+         bt->mgr->hashtable[idx].entry = latch->next;
+
+       if( latch->next )
+         bt->mgr->latchsets[latch->next].prev = latch->prev;
+
+    bt_releasemutex (bt->mgr->hashtable[idx].latch);
+   }
+
+  //  remove available status
+
+  latch->avail = 0;
+#ifdef unix
+  __sync_fetch_and_add (&bt->mgr->available, -1);
+#else
+  _InterlockedDecrement(&bt->mgr->available);
+#endif
+  page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
+
+  if( latch->dirty )
+       if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
+         return bt->line = __LINE__, NULL;
+       else
+         latch->dirty = 0, bt->writes++;
+
+  if( loadit ) {
+       memcpy (page, loadit, bt->mgr->page_size);
+       latch->dirty = 1;
+  } else
+       if( bt->err = bt_readpage (bt->mgr, page, page_no) )
+         return bt->line = __LINE__, NULL;
+       else
+         bt->reads++;
+
+  //  link page as head of hash table chain
+  //  if this is a never before used entry,
+  //  or it was previously on a different
+  //  hash table chain. Otherwise, just
+  //  leave it in its current hash table
+  //  chain position.
+
+  if( !latch->page_no || hashidx != idx ) {
+       if( latch->next = bt->mgr->hashtable[hashidx].entry )
+         bt->mgr->latchsets[latch->next].prev = entry;
+
+       bt->mgr->hashtable[hashidx].entry = entry;
+    latch->prev = 0;
+  }
+
+  //  fill in latch structure
+
+  latch->pin = CLOCK_bit | 1;
+  latch->page_no = page_no;
+  latch->entry = entry;
+  latch->split = 0;
+
+  bt_releasemutex (latch->modify);
+  bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
+  return latch;
+}
+  
 void bt_mgrclose (BtMgr *mgr)
 {
 BtLatchSet *latch;
@@ -1075,7 +1298,10 @@ uint num = 0;
 BtPage page;
 uint slot;
 
-       //      flush recovery buffer to disk
+       //      flush previously written dirty pages
+       //      and write recovery buffer to disk
+
+       fdatasync (mgr->idx);
 
        if( mgr->redoend ) {
                eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
@@ -1084,9 +1310,9 @@ uint slot;
                pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
        }
 
-       //      flush dirty pool pages to the btree
+       //      write remaining dirty pool pages to the btree
 
-       for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
+       for( slot = 1; slot < mgr->latchtotal; slot++ ) {
                page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
                latch = mgr->latchsets + slot;
 
@@ -1096,15 +1322,28 @@ uint slot;
                }
        }
 
+       //      flush last batch to disk and clear
+       //      redo recovery buffer on disk.
+
+       fdatasync (mgr->idx);
+
+       eof = (BtLogHdr *)mgr->redobuff;
+       memset (eof, 0, sizeof(BtLogHdr));
+       eof->lsn = mgr->lsn;
+
+       pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
+
+       sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
 
 #ifdef unix
-       munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
+       munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
        munmap (mgr->pagezero, mgr->page_size);
 #else
        FlushViewOfFile(mgr->pagezero, 0);
        UnmapViewOfFile(mgr->pagezero);
-       UnmapViewOfFile(mgr->hashtable);
+       UnmapViewOfFile(mgr->pagepool);
        CloseHandle(mgr->halloc);
        CloseHandle(mgr->hpool);
 #endif
@@ -1146,6 +1385,7 @@ uint nlatchpage, latchhash;
 unsigned char value[BtId];
 int flag, initit = 0;
 BtPageZero *pagezero;
+BtLatchSet *latch;
 off64_t size;
 uint amt[1];
 BtMgr* mgr;
@@ -1217,11 +1457,10 @@ BtVal *val;
 
        //  calculate number of latch hash table entries
 
-       mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
-       mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
+       mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
 
        mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
-       mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
+       mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
        mgr->latchtotal = nodemax;
 
        if( !initit )
@@ -1287,8 +1526,8 @@ mgrlatch:
        }
        mlock (mgr->pagezero, mgr->page_size);
 
-       mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
-       if( mgr->hashtable == MAP_FAILED ) {
+       mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
+       if( mgr->pagepool == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
@@ -1318,8 +1557,8 @@ mgrlatch:
        }
 
        flag = FILE_MAP_WRITE;
-       mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
-       if( !mgr->hashtable ) {
+       mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
+       if( !mgr->pagepool ) {
                fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
                return bt_mgrclose (mgr), NULL;
        }
@@ -1327,8 +1566,17 @@ mgrlatch:
                mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
 #endif
 
-       mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
-       mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
+       mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
+       mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
+       mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
+
+       //      mark all pool entries as available
+
+       for( idx = 1; idx < mgr->latchtotal; idx++ ) {
+               latch = mgr->latchsets + idx;
+               latch->avail = 1;
+               mgr->available++;
+       }
 
        return mgr;
 }
@@ -1449,19 +1697,19 @@ int blk;
 
        //      lock allocation page
 
-       bt_spinwritelock(bt->mgr->lock);
+       bt_mutexlock(bt->mgr->lock);
 
        // use empty chain first
        // else allocate empty page
 
        if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
-               if( set->latch = bt_pinlatch (bt, page_no, 1) )
+               if( set->latch = bt_pinlatch (bt, page_no, NULL) )
                        set->page = bt_mappage (bt, set->latch);
                else
-                       return bt->err = BTERR_struct, -1;
+                       return bt->err = BTERR_struct, bt->line = __LINE__, -1;
 
                bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
-               bt_spinreleasewrite(bt->mgr->lock);
+               bt_releasemutex(bt->mgr->lock);
 
                memcpy (set->page, contents, bt->mgr->page_size);
                set->latch->dirty = 1;
@@ -1471,18 +1719,18 @@ int blk;
        page_no = bt_getid(bt->mgr->pagezero->alloc->right);
        bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
 
-       // unlock allocation latch
+       // unlock allocation latch and
+       //      extend file into new page.
 
-       bt_spinreleasewrite(bt->mgr->lock);
+       bt_releasemutex(bt->mgr->lock);
 
        //      don't load cache from btree page
 
-       if( set->latch = bt_pinlatch (bt, page_no, 0) )
+       if( set->latch = bt_pinlatch (bt, page_no, contents) )
                set->page = bt_mappage (bt, set->latch);
        else
-               return bt->err = BTERR_struct;
+               return bt->err;
 
-       memcpy (set->page, contents, bt->mgr->page_size);
        set->latch->dirty = 1;
        return 0;
 }
@@ -1529,6 +1777,7 @@ uid page_no = ROOT_page, prevpage = 0;
 uint drill = 0xff, slot;
 BtLatchSet *prevlatch;
 uint mode, prevmode;
+BtVal *val;
 
   //  start at root of btree and drill down
 
@@ -1536,7 +1785,7 @@ uint mode, prevmode;
        // determine lock mode of drill level
        mode = (drill == lvl) ? lock : BtLockRead; 
 
-       if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
+       if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
          return 0;
 
        // obtain access lock using lock chaining with Access mode
@@ -1559,7 +1808,7 @@ uint mode, prevmode;
        bt_lockpage(bt, mode, set->latch);
 
        if( set->page->free )
-               return bt->err = BTERR_struct, 0;
+               return bt->err = BTERR_struct, bt->line = __LINE__, 0;
 
        if( page_no > ROOT_page )
          bt_unlockpage(bt, BtLockAccess, set->latch);
@@ -1568,7 +1817,7 @@ uint mode, prevmode;
 
        if( set->page->lvl != drill) {
                if( set->latch->page_no != ROOT_page )
-                       return bt->err = BTERR_struct, 0;
+                       return bt->err = BTERR_struct, bt->line = __LINE__, 0;
                        
                drill = set->page->lvl;
 
@@ -1597,21 +1846,27 @@ uint mode, prevmode;
                if( slot++ < set->page->cnt )
                  continue;
                else
-                 return bt->err = BTERR_struct, 0;
+                 return bt->err = BTERR_struct, bt->line = __LINE__, 0;
+
+         val = valptr(set->page, slot);
+
+         if( val->len == BtId )
+               page_no = bt_getid(valptr(set->page, slot)->value);
+         else
+               return bt->line = __LINE__, bt->err = BTERR_struct, 0;
 
-         page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
          continue;
         }
 
-       //  or slide right into next page
+       //  slide right into next page
 
        page_no = bt_getid(set->page->right);
   } while( page_no );
 
   // return error on end of right chain
 
-  bt->err = BTERR_struct;
+  bt->line = __LINE__, bt->err = BTERR_struct;
   return 0;    // return error
 }
 
@@ -1622,7 +1877,7 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 {
        //      lock allocation page
 
-       bt_spinwritelock (bt->mgr->lock);
+       bt_mutexlock (bt->mgr->lock);
 
        //      store chain
 
@@ -1639,7 +1894,7 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
        // unlock allocation page
 
-       bt_spinreleasewrite (bt->mgr->lock);
+       bt_releasemutex (bt->mgr->lock);
 }
 
 //     a fence key was deleted from a page
@@ -1694,6 +1949,7 @@ BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
 {
 BtPageSet child[1];
 uid page_no;
+BtVal *val;
 uint idx;
 
   // find the child entry and promote as new root contents
@@ -1703,9 +1959,14 @@ uint idx;
          if( !slotptr(root->page, idx)->dead )
                break;
 
-       page_no = bt_getid (valptr(root->page, idx)->value);
+       val = valptr(root->page, idx);
 
-       if( child->latch = bt_pinlatch (bt, page_no, 1) )
+       if( val->len == BtId )
+               page_no = bt_getid (valptr(root->page, idx)->value);
+       else
+               return bt->line = __LINE__, bt->err = BTERR_struct;
+
+       if( child->latch = bt_pinlatch (bt, page_no, NULL) )
                child->page = bt_mappage (bt, child->latch);
        else
                return bt->err;
@@ -1748,7 +2009,7 @@ BtKey *ptr;
 
        page_no = bt_getid(set->page->right);
 
-       if( right->latch = bt_pinlatch (bt, page_no, 1) )
+       if( right->latch = bt_pinlatch (bt, page_no, NULL) )
                right->page = bt_mappage (bt, right->latch);
        else
                return 0;
@@ -1761,7 +2022,7 @@ BtKey *ptr;
        memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
 
        if( right->page->kill )
-               return bt->err = BTERR_struct;
+               return bt->line = __LINE__, bt->err = BTERR_struct;
 
        // pull contents of right peer into our empty page
 
@@ -1895,12 +2156,12 @@ uid page_no;
        prevlatch = set->latch;
 
        if( page_no = bt_getid(set->page->right) )
-         if( set->latch = bt_pinlatch (bt, page_no, 1) )
+         if( set->latch = bt_pinlatch (bt, page_no, NULL) )
                set->page = bt_mappage (bt, set->latch);
          else
                return 0;
        else
-         return bt->err = BTERR_struct, 0;
+         return bt->err = BTERR_struct, bt->line = __LINE__, 0;
 
        // obtain access lock using lock chaining with Access mode
 
@@ -2004,6 +2265,7 @@ BtVal *val;
 
        memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
        set->latch->dirty = 1;
+
        page->garbage = 0;
        page->act = 0;
 
@@ -2197,6 +2459,8 @@ uint prev;
        if( bt_newpage(bt, right, bt->frame) )
                return 0;
 
+       // process lower keys
+
        memcpy (bt->frame, set->page, bt->mgr->page_size);
        memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
        set->latch->dirty = 1;
@@ -2410,7 +2674,7 @@ uint type;
                ptr = keyptr(set->page, slot);
        else {
                if( !bt->err )
-                       bt->err = BTERR_ovflw;
+                       bt->line = __LINE__, bt->err = BTERR_ovflw;
                return bt->err;
        }
 
@@ -2495,6 +2759,7 @@ uint type;
 }
 
 typedef struct {
+       logseqno reqlsn;        // redo log seq no required
        logseqno lsn;           // redo log sequence number
        uint entry;                     // latch table entry number
        uint slot:31;           // page slot number
@@ -2548,7 +2813,7 @@ uint entry;
                }
        } while( entry = set->latch->split );
 
-       bt->err = BTERR_atomic;
+       bt->line = __LINE__, bt->err = BTERR_atomic;
        return 0;
 }
 
@@ -2582,21 +2847,21 @@ uint entry, slot;
        locks[src].slot = 0;
   }
 
-  return bt->err = BTERR_atomic;
+  return bt->line = __LINE__, bt->err = BTERR_atomic;
 }
 
 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
-uint idx, entry, slot;
 BtPageSet set[1];
+uint idx, slot;
 BtKey *ptr;
 BtVal *val;
 
        if( slot = bt_atomicpage (bt, source, locks, src, set) )
          ptr = keyptr(set->page, slot);
        else
-         return bt->err = BTERR_struct;
+         return bt->line = __LINE__, bt->err = BTERR_struct;
 
        if( !keycmp (ptr, key->key, key->len) )
          if( !slotptr(set->page, slot)->dead )
@@ -2608,8 +2873,8 @@ BtVal *val;
 
        val = valptr(set->page, slot);
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
-       set->page->lsn = locks[src].lsn;
        set->latch->dirty = 1;
+       set->page->lsn = locks[src].lsn;
        set->page->act--;
        bt->found++;
        return 0;
@@ -2633,7 +2898,7 @@ BtKey *ptr;
 
        //      grab the right sibling
 
-       if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+       if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
                right->page = bt_mappage (bt, right->latch);
        else
                return bt->err;
@@ -2673,7 +2938,7 @@ BtKey *ptr;
        //      to remove scanner's poiner to the right page
 
        if( right_page_no = bt_getid (prev->page->right) ) {
-         if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
+         if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
                temp->page = bt_mappage (bt, temp->latch);
 
          bt_lockpage (bt, BtLockWrite, temp->latch);
@@ -2683,9 +2948,9 @@ BtKey *ptr;
          bt_unlockpage (bt, BtLockWrite, temp->latch);
          bt_unpinlatch (temp->latch);
        } else {        // master is now the far right page
-         bt_spinwritelock (bt->mgr->lock);
+         bt_mutexlock (bt->mgr->lock);
          bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
-         bt_spinreleasewrite(bt->mgr->lock);
+         bt_releasemutex(bt->mgr->lock);
        }
 
        //      now that there are no pointers to the right page
@@ -2708,7 +2973,7 @@ BtKey *ptr;
 
 int bt_atomictxn (BtDb *bt, BtPage source)
 {
-uint src, idx, slot, samepage, entry;
+uint src, idx, slot, samepage, entry, avail, que = 0;
 AtomicKey *head, *tail, *leaf;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
@@ -2717,6 +2982,7 @@ BtLatchSet *latch;
 AtomicTxn *locks;
 int result = 0;
 BtSlot temp[1];
+logseqno lsn;
 BtPage page;
 BtVal *val;
 uid right;
@@ -2743,6 +3009,11 @@ int type;
        }
   }
 
+  // reserve enough buffer pool entries
+
+  avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
+  bt_availrequest (bt, avail);
+
   // Load the leaf page for each key
   // group same page references with reuse bit
   // and determine any constraint violations
@@ -2782,6 +3053,12 @@ int type;
          locks[src].reuse = 1;
        }
 
+       //      capture current lsn for master page
+
+       locks[src].reqlsn = set->page->lsn;
+
+       //      perform constraint checks
+
        switch( slotptr(source, src)->type ) {
        case Duplicate:
        case Unique:
@@ -2816,29 +3093,12 @@ int type;
 
   //  and add entries to redo log
 
-  for( src = 0; src++ < source->cnt; ) {
-       key = keyptr(source, src);
-       val = valptr(source, src);
-       switch( slotptr(source, src)->type ) {
-       case Unique:
-               type = BTRM_add;
-               break;
-       case Duplicate:
-               type = BTRM_dup;
-               break;
-       case Delete:
-               type = BTRM_del;
-               break;
-       case Update:
-               type = BTRM_upd;
-               break;
-       }
-
-       if( locks[src].lsn = bt_newredo (bt, type, 0, key, val) )
-               continue;
-
-       goto atomicerr;
-  }
+  if( bt->mgr->redopages )
+   if( lsn = bt_txnredo (bt, source) )
+    for( src = 0; src++ < source->cnt; )
+        locks[src].lsn = lsn;
+    else
+        goto atomicerr;
 
   //  obtain write lock for each master page
 
@@ -2928,7 +3188,7 @@ int type;
          //  schedule prev fence key update
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = calloc (sizeof(AtomicKey), 1);
+         leaf = calloc (sizeof(AtomicKey), 1), que++;
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
@@ -2958,7 +3218,7 @@ int type;
          //  far right sibling or set rightmost page in page zero
 
          if( right = bt_getid (prev->page->right) ) {
-               if( set->latch = bt_pinlatch (bt, right, 1) )
+               if( set->latch = bt_pinlatch (bt, right, NULL) )
                  set->page = bt_mappage (bt, set->latch);
                else
                  goto atomicerr;
@@ -2969,15 +3229,15 @@ int type;
            bt_unlockpage (bt, BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
          } else {      // prev is rightmost page
-           bt_spinwritelock (bt->mgr->lock);
+           bt_mutexlock (bt->mgr->lock);
                bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
-           bt_spinreleasewrite(bt->mgr->lock);
+           bt_releasemutex(bt->mgr->lock);
          }
 
          //  Process last page split in chain
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = calloc (sizeof(AtomicKey), 1);
+         leaf = calloc (sizeof(AtomicKey), 1), que++;
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
@@ -3023,7 +3283,7 @@ int type;
        //      perform the remainder of the delete
        //      from the FIFO queue
 
-       leaf = calloc (sizeof(AtomicKey), 1);
+       leaf = calloc (sizeof(AtomicKey), 1), que++;
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
@@ -3044,6 +3304,11 @@ int type;
        bt_unlockpage(bt, BtLockWrite, prev->latch);
   }
 
+  bt_availrelease (bt, avail);
+
+  que *= bt->mgr->pagezero->alloc->lvl;
+  bt_availrequest (bt, que);
+
   //  add & delete keys for any pages split or merged during transaction
 
   if( leaf = head )
@@ -3082,6 +3347,8 @@ int type;
          free (leaf);
        } while( leaf = tail );
 
+  bt_availrelease (bt, que);
+
   // return success
 
   free (locks);
@@ -3097,7 +3364,7 @@ uint bt_lastkey (BtDb *bt)
 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
 BtPageSet set[1];
 
-       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+       if( set->latch = bt_pinlatch (bt, page_no, NULL) )
                set->page = bt_mappage (bt, set->latch);
        else
                return 0;
@@ -3130,7 +3397,7 @@ goleft:
 findourself:
        bt->cursor_page = next;
 
-       if( set->latch = bt_pinlatch (bt, next, 1) )
+       if( set->latch = bt_pinlatch (bt, next, NULL) )
                set->page = bt_mappage (bt, set->latch);
        else
                return 0;
@@ -3178,7 +3445,7 @@ uid right;
 
        bt->cursor_page = right;
 
-       if( set->latch = bt_pinlatch (bt, right, 1) )
+       if( set->latch = bt_pinlatch (bt, right, NULL) )
                set->page = bt_mappage (bt, set->latch);
        else
                return 0;
@@ -3293,102 +3560,29 @@ struct timeval tv[1];
 void bt_poolaudit (BtMgr *mgr)
 {
 BtLatchSet *latch;
-uint slot = 0;
-
-       while( slot++ < mgr->latchdeployed ) {
-               latch = mgr->latchsets + slot;
-
-               if( *latch->readwr->rin & MASK )
-                       fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
-               memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
-
-               if( *latch->access->rin & MASK )
-                       fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
-               memset ((ushort *)latch->access, 0, sizeof(RWLock));
-
-               if( *latch->parent->ticket != *latch->parent->serving )
-                       fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
-               memset ((ushort *)latch->parent, 0, sizeof(RWLock));
-
-               if( latch->pin & ~CLOCK_bit ) {
-                       fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
-                       latch->pin = 0;
-               }
-       }
-}
-
-uint bt_latchaudit (BtDb *bt)
-{
-ushort idx, hashidx;
-uid next, page_no;
-BtLatchSet *latch;
-uint cnt = 0;
-BtKey *ptr;
+uint entry = 0;
 
-       if( *(ushort *)(bt->mgr->lock) )
-               fprintf(stderr, "Alloc page locked\n");
-       *(ushort *)(bt->mgr->lock) = 0;
+       while( ++entry < mgr->latchtotal ) {
+               latch = mgr->latchsets + entry;
 
-       for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
-               latch = bt->mgr->latchsets + idx;
                if( *latch->readwr->rin & MASK )
-                       fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
-               memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
+                       fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
 
                if( *latch->access->rin & MASK )
-                       fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
-               memset ((ushort *)latch->access, 0, sizeof(RWLock));
-
-               if( *latch->parent->ticket != *latch->parent->serving )
-                       fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
-               memset ((ushort *)latch->parent, 0, sizeof(RWLock));
-
-               if( latch->pin ) {
-                       fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
-                       latch->pin = 0;
-               }
-       }
-
-       for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
-         if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
-                       fprintf(stderr, "hash entry %d locked\n", hashidx);
-
-         *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
-
-         if( idx = bt->mgr->hashtable[hashidx].slot ) do {
-               latch = bt->mgr->latchsets + idx;
-               if( latch->pin )
-                       fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
-         } while( idx = latch->next );
-       }
-
-       page_no = LEAF_page;
+                       fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
 
-       while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
-       uid off = page_no << bt->mgr->page_bits;
-#ifdef unix
-         pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
-#else
-       DWORD amt[1];
+               if( *latch->parent->exclusive )
+                       fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
 
-         SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
+               if( *latch->atomic->exclusive )
+                       fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
 
-         if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
-               return bt->err = BTERR_map;
+               if( *latch->modify->exclusive )
+                       fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
 
-         if( *amt <  bt->mgr->page_size )
-               return bt->err = BTERR_map;
-#endif
-               if( !bt->frame->free && !bt->frame->lvl )
-                       cnt += bt->frame->act;
-               page_no++;
+               if( latch->pin & ~CLOCK_bit )
+                       fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
        }
-               
-       cnt--;  // remove stopper key
-       fprintf(stderr, " Total keys read %d\n", cnt);
-
-       bt_close (bt);
-       return 0;
 }
 
 typedef struct {
@@ -3432,12 +3626,6 @@ FILE *in;
 
        switch(ch | 0x20)
        {
-       case 'a':
-               fprintf(stderr, "started latch mgr audit\n");
-               cnt = bt_latchaudit (bt);
-               fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
-               break;
-
        case 'd':
                type = Delete;
 
@@ -3464,7 +3652,7 @@ FILE *in;
 
                          if( !args->num ) {
                            if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
-                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
                            len = 0;
                                continue;
                          }
@@ -3489,7 +3677,7 @@ FILE *in;
                          page->min = nxt;
 
                          if( bt_atomictxn (bt, page) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+                               fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
                          nxt = sizeof(txn);
                          cnt = 0;
 
@@ -3508,7 +3696,7 @@ FILE *in;
                          line++;
 
                          if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+                               fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
                          len = 0;
                        }
                        else if( len < BT_maxkey )
@@ -3526,7 +3714,7 @@ FILE *in;
                          if( bt_findkey (bt, key, len, NULL, 0) == 0 )
                                found++;
                          else if( bt->err )
-                               fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
+                               fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
                          len = 0;
                        }
                        else if( len < BT_maxkey )
@@ -3536,8 +3724,9 @@ FILE *in;
 
        case 's':
                fprintf(stderr, "started scanning\n");
+               
                do {
-                       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+                       if( set->latch = bt_pinlatch (bt, page_no, NULL) )
                                set->page = bt_mappage (bt, set->latch);
                        else
                                fprintf(stderr, "unable to obtain latch"), exit(1);
@@ -3560,6 +3749,7 @@ FILE *in;
                                cnt++;
                           }
 
+                       set->latch->avail = 1;
                        bt_unlockpage (bt, BtLockRead, set->latch);
                        bt_unpinlatch (set->latch);
                } while( page_no = next );