]> pd.if.org Git - btree/blobdiff - btree2u.c
Remove debug code.
[btree] / btree2u.c
index 2e818ace951931bd7a2ee2fa3e5a9462491f709c..308c9f28ba82ea37791fa93b209b67841bff4758 100644 (file)
--- a/btree2u.c
+++ b/btree2u.c
@@ -43,6 +43,7 @@ REDISTRIBUTION OF THIS SOFTWARE.
 #include <stdlib.h>
 #include <time.h>
 #include <fcntl.h>
+#include <io.h>
 #endif
 
 #include <memory.h>
@@ -65,6 +66,17 @@ typedef unsigned int         uint;
 #define BT_minpage             (1 << BT_minbits)       // minimum page size
 #define BT_maxpage             (1 << BT_maxbits)       // maximum page size
 
+//  BTree page number constants
+#define ALLOC_page             0
+#define ROOT_page              1
+#define LEAF_page              2
+#define LATCH_page             3
+
+//     Number of levels to create in a new BTree
+
+#define MIN_lvl                        2
+#define MAX_lvl                        15
+
 /*
 There are five lock types for each node in three independent sets: 
 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
@@ -136,22 +148,25 @@ typedef struct BtPage_ {
        uint cnt;                                       // count of keys in page
        uint act;                                       // count of active keys
        uint min;                                       // next key offset
-       unsigned char bits:7;           // page size in bits
+       unsigned char bits:6;           // page size in bits
        unsigned char free:1;           // page is on free list
+       unsigned char dirty:1;          // page is dirty in cache
        unsigned char lvl:6;            // level of page
        unsigned char kill:1;           // page is being deleted
-       unsigned char dirty:1;          // page is dirty
+       unsigned char clean:1;          // page needs cleaning
        unsigned char right[BtId];      // page number to right
 } *BtPage;
 
 typedef struct {
        struct BtPage_ alloc[2];        // next & free page_nos in right ptr
        BtSpinLatch lock[1];            // allocation area lite latch
-       uint latchdeployed;                     // highest number of latch entries deployed
-       uint nlatchpage;                        // number of latch pages at BT_latch
-       uint latchtotal;                        // number of page latch entries
-       uint latchhash;                         // number of latch hash table slots
-       uint latchvictim;                       // next latch hash entry to examine
+       volatile uint latchdeployed;// highest number of latch entries deployed
+       volatile uint nlatchpage;       // number of latch pages at BT_latch
+       volatile uint latchtotal;       // number of page latch entries
+       volatile uint latchhash;        // number of latch hash table slots
+       volatile uint latchvictim;      // next latch hash entry to examine
+       volatile uint safelevel;        // safe page level in cache
+       volatile uint cache[MAX_lvl];// cache census counts by btree level
 } BtLatchMgr;
 
 //  latch hash table entries
@@ -168,12 +183,17 @@ typedef struct {
        BtSpinLatch readwr[1];          // read/write page lock
        BtSpinLatch access[1];          // Access Intent/Page delete
        BtSpinLatch parent[1];          // Posting of fence key in parent
-       volatile ushort dirty;          // page is dirty in cache
+       volatile ushort pin;            // number of pins/level/clock bits
        volatile uint next;                     // next entry in hash table chain
        volatile uint prev;                     // prev entry in hash table chain
-       volatile uint pin;                      // number of outstanding pins
 } BtLatchSet;
 
+#define CLOCK_mask 0xe000
+#define CLOCK_unit 0x2000
+#define PIN_mask 0x07ff
+#define LVL_mask 0x1800
+#define LVL_shift 11
+
 //     The object structure for Btree access
 
 typedef struct _BtDb {
@@ -183,15 +203,13 @@ typedef struct _BtDb {
        uid cursor_page;        // current cursor page number   
        int  err;
        uint mode;                      // read-write mode
-       BtPage alloc;           // frame buffer for alloc page ( page 0 )
        BtPage cursor;          // cached frame for start/next (never mapped)
        BtPage frame;           // spare frame for the page split (never mapped)
-       BtPage zero;            // zeroes frame buffer (never mapped)
-       BtPage page;            // current page
+       BtPage page;            // current mapped page in buffer pool
        BtLatchSet *latch;                      // current page latch
        BtLatchMgr *latchmgr;           // mapped latch page from allocation page
        BtLatchSet *latchsets;          // mapped latch set from latch pages
-       unsigned char *latchpool;       // cached page pool set
+       unsigned char *pagepool;        // cached page pool set
        BtHashEntry *table;     // the hash table
 #ifdef unix
        int idx;
@@ -199,7 +217,7 @@ typedef struct _BtDb {
        HANDLE idx;
        HANDLE halloc;          // allocation and latch table handle
 #endif
-       unsigned char *mem;     // frame, cursor, page memory buffer
+       unsigned char *mem;     // frame, cursor, memory buffers
        uint found;                     // last deletekey found key
 } BtDb;
 
@@ -227,7 +245,7 @@ extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
 //     internal functions
-BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch);
+void bt_update (BtDb *bt, BtPage page);
 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch);
 //  Helper functions to return slot values
 
@@ -237,16 +255,6 @@ extern uid bt_uid (BtDb *bt, uint slot);
 extern uint bt_tod (BtDb *bt, uint slot);
 #endif
 
-//  BTree page number constants
-#define ALLOC_page             0
-#define ROOT_page              1
-#define LEAF_page              2
-#define LATCH_page             3
-
-//     Number of levels to create in a new BTree
-
-#define MIN_lvl                        2
-
 //  The page is allocated from low and hi ends.
 //  The key offsets and row-id's are allocated
 //  from the bottom, while the text of the key
@@ -468,34 +476,98 @@ void bt_spinreleaseread(BtSpinLatch *latch)
 #endif
 }
 
+//     read page from permanent location in Btree file
+
+BTERR bt_readpage (BtDb *bt, BtPage page, uid page_no)
+{
+off64_t off = page_no << bt->page_bits;
+
+#ifdef unix
+       if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size ) {
+               fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
+               return bt->err = BTERR_read;
+       }
+#else
+OVERLAPPED ovl[1];
+uint amt[1];
+
+       memset (ovl, 0, sizeof(OVERLAPPED));
+       ovl->Offset = off;
+       ovl->OffsetHigh = off >> 32;
+
+       if( !ReadFile(bt->idx, page, bt->page_size, amt, ovl)) {
+               fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
+               return bt->err = BTERR_read;
+       }
+       if( *amt <  bt->page_size ) {
+               fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
+               return bt->err = BTERR_read;
+       }
+#endif
+       return 0;
+}
+
+//     write page to permanent location in Btree file
+//     clear the dirty bit
+
+BTERR bt_writepage (BtDb *bt, BtPage page, uid page_no)
+{
+off64_t off = page_no << bt->page_bits;
+
+#ifdef unix
+       page->dirty = 0;
+
+       if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
+               return bt->err = BTERR_wrt;
+#else
+OVERLAPPED ovl[1];
+uint amt[1];
+
+       memset (ovl, 0, sizeof(OVERLAPPED));
+       ovl->Offset = off;
+       ovl->OffsetHigh = off >> 32;
+       page->dirty = 0;
+
+       if( !WriteFile(bt->idx, page, bt->page_size, amt, ovl) )
+               return bt->err = BTERR_wrt;
+
+       if( *amt <  bt->page_size )
+               return bt->err = BTERR_wrt;
+#endif
+       return 0;
+}
+
 //     link latch table entry into head of latch hash table
 
 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
 {
-BtPage page = (BtPage)(slot * bt->page_size + bt->latchpool);
+BtPage page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
 BtLatchSet *latch = bt->latchsets + slot;
-off64_t off = page_no << bt->page_bits;
-uint amt[1];
+int lvl;
 
        if( latch->next = bt->table[hashidx].slot )
                bt->latchsets[latch->next].prev = slot;
 
        bt->table[hashidx].slot = slot;
        latch->page_no = page_no;
-       latch->dirty = 0;
        latch->prev = 0;
        latch->pin = 1;
+
+       if( bt_readpage (bt, page, page_no) )
+               return bt->err;
+
+       lvl = page->lvl << LVL_shift;
+       if( lvl > LVL_mask )
+               lvl = LVL_mask;
+       latch->pin |= lvl;              // store lvl
+       latch->pin |= lvl << 3; // initialize clock
+
 #ifdef unix
-       if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size )
-               return bt->err = BTERR_read;
+       __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], 1);
 #else
-       SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
-       if( !ReadFile(bt->idx, page, bt->page_size, amt, NULL))
-               return bt->err = BTERR_read;
-       if( *amt <  bt->page_size )
-               return bt->err = BTERR_read;
+       _InterlockedAdd(&bt->latchmgr->cache[page->lvl], 1);
 #endif
-       return 0;
+       return bt->err = 0;
 }
 
 //     release latch pin
@@ -505,7 +577,7 @@ void bt_unpinlatch (BtLatchSet *latch)
 #ifdef unix
        __sync_fetch_and_add(&latch->pin, -1);
 #else
-       _InterlockedDecrement (&latch->pin);
+       _InterlockedDecrement16 (&latch->pin);
 #endif
 }
 
@@ -517,11 +589,12 @@ BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
 uint hashidx = page_no % bt->latchmgr->latchhash;
 BtLatchSet *latch;
 uint slot, idx;
+uint lvl, cnt;
 off64_t off;
 uint amt[1];
 BtPage page;
 
-  //  try to find unpinned entry
+  //  try to find our entry
 
   bt_spinwritelock(bt->table[hashidx].latch);
 
@@ -532,33 +605,21 @@ BtPage page;
                break;
   } while( slot = latch->next );
 
-  //  found our entry, bring to front of hash chain
+  //  found our entry
+  //  increment clock
 
   if( slot ) {
        latch = bt->latchsets + slot;
+       lvl = (latch->pin & LVL_mask) >> LVL_shift;
+       lvl *= CLOCK_unit * 2;
+       lvl |= CLOCK_unit;
 #ifdef unix
        __sync_fetch_and_add(&latch->pin, 1);
+       __sync_fetch_and_or(&latch->pin, lvl);
 #else
-       _InterlockedIncrement (&latch->pin);
+       _InterlockedIncrement16 (&latch->pin);
+       _InterlockedOr16 (&latch->pin, lvl);
 #endif
-       //  unlink our entry from its hash chain position
-
-       if( latch->prev )
-               bt->latchsets[latch->prev].next = latch->next;
-       else
-               bt->table[hashidx].slot = latch->next;
-
-       if( latch->next )
-               bt->latchsets[latch->next].prev = latch->prev;
-
-       //  now link into head of the hash chain
-
-       if( latch->next = bt->table[hashidx].slot )
-               bt->latchsets[latch->next].prev = slot;
-
-       bt->table[hashidx].slot = slot;
-       latch->prev = 0;
-
        bt_spinreleasewrite(bt->table[hashidx].latch);
        return latch;
   }
@@ -583,55 +644,71 @@ BtPage page;
 #else
        _InterlockedDecrement (&bt->latchmgr->latchdeployed);
 #endif
-  //  find and reuse previous lru lock entry on victim hash chain
+  //  find and reuse previous entry on victim
 
   while( 1 ) {
 #ifdef unix
-       idx = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
+       slot = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
 #else
-       idx = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
+       slot = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
 #endif
        // try to get write lock on hash chain
        //      skip entry if not obtained
-       //      or has outstanding locks
+       //      or has outstanding pins
 
-       idx %= bt->latchmgr->latchhash;
+       slot %= bt->latchmgr->latchtotal;
 
-       if( !bt_spinwritetry (bt->table[idx].latch) )
-               continue;
+       //      on slot wraparound, check census
+       //      count and increment safe level
 
-       if( slot = bt->table[idx].slot )
-         while( 1 ) {
-               latch = bt->latchsets + slot;
-               if( !latch->next )
-                       break;
-               slot = latch->next;
-         }
+       cnt = bt->latchmgr->cache[bt->latchmgr->safelevel];
 
-       if( !slot || latch->pin ) {
-               bt_spinreleasewrite (bt->table[idx].latch);
-               continue;
+       if( !slot ) {
+         if( cnt < bt->latchmgr->latchtotal / 10 )
+#ifdef unix
+               __sync_fetch_and_add(&bt->latchmgr->safelevel, 1);
+#else
+               _InterlockedIncrement (&bt->latchmgr->safelevel);
+#endif
+         continue;
        }
 
-       //  update permanent page area in btree
+       latch = bt->latchsets + slot;
+       idx = latch->page_no % bt->latchmgr->latchhash;
+       lvl = (latch->pin & LVL_mask) >> LVL_shift;
+
+       //      see if we are evicting this level yet
+
+       if( lvl > bt->latchmgr->safelevel )
+               continue;
+
+       if( !bt_spinwritetry (bt->table[idx].latch) )
+               continue;
 
-       page = (BtPage)(slot * bt->page_size + bt->latchpool);
-       off = latch->page_no << bt->page_bits;
+       if( latch->pin & ~LVL_mask ) {
+         if( latch->pin & CLOCK_mask )
 #ifdef unix
-       if( latch->dirty )
-         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
-               return bt->err = BTERR_wrt, NULL;
+               __sync_fetch_and_add(&latch->pin, -CLOCK_unit);
 #else
-       if( latch->dirty ) {
-         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
+               _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_unit);
+#endif
+         bt_spinreleasewrite (bt->table[idx].latch);
+         continue;
+       }
 
-         if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
-               return bt->err = BTERR_wrt, NULL;
+       //  update permanent page area in btree
 
-         if( *amt <  bt->page_size )
-               return bt->err = BTERR_wrt, NULL;
-       }
+       page = (BtPage)((uid)slot * bt->page_size + bt->pagepool);
+#ifdef unix
+       posix_fadvise (bt->idx, page_no << bt->page_bits, bt->page_size, POSIX_FADV_WILLNEED);
+       __sync_fetch_and_add (&bt->latchmgr->cache[page->lvl], -1);
+#else
+       _InterlockedAdd(&bt->latchmgr->cache[page->lvl], -1);
 #endif
+       if( page->dirty )
+         if( bt_writepage (bt, page, latch->page_no) )
+               return NULL;
+
        //  unlink our available slot from its hash chain
 
        if( latch->prev )
@@ -643,8 +720,10 @@ BtPage page;
                bt->latchsets[latch->next].prev = latch->prev;
 
        bt_spinreleasewrite (bt->table[idx].latch);
+
        if( bt_latchlink (bt, hashidx, slot, page_no) )
                return NULL;
+
        bt_spinreleasewrite (bt->table[hashidx].latch);
        return latch;
   }
@@ -704,43 +783,55 @@ struct flock lock[1];
        else if( bits < BT_minbits )
                bits = BT_minbits;
 
+       if( mode == BT_ro ) {
+               fprintf(stderr, "ReadOnly mode not supported: %s\n", name);
+               return NULL;
+       }
 #ifdef unix
        bt = calloc (1, sizeof(BtDb));
 
        bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
-
-       if( bt->idx == -1 )
+       posix_fadvise( bt->idx, 0, 0, POSIX_FADV_RANDOM);
+       if( bt->idx == -1 ) {
+               fprintf(stderr, "unable to open %s\n", name);
                return free(bt), NULL;
+       }
 #else
        bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
        attr = FILE_ATTRIBUTE_NORMAL;
        bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
 
-       if( bt->idx == INVALID_HANDLE_VALUE )
+       if( bt->idx == INVALID_HANDLE_VALUE ) {
+               fprintf(stderr, "unable to open %s\n", name);
                return GlobalFree(bt), NULL;
+       }
 #endif
 #ifdef unix
        memset (lock, 0, sizeof(lock));
        lock->l_len = sizeof(struct BtPage_);
        lock->l_type = F_WRLCK;
 
-       if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
+       if( fcntl (bt->idx, F_SETLKW, lock) < 0 ) {
+               fprintf(stderr, "unable to lock record zero %s\n", name);
                return bt_close (bt), NULL;
+       }
 #else
        memset (ovl, 0, sizeof(ovl));
-       len = sizeof(struct BtPage_);
 
        //      use large offsets to
        //      simulate advisory locking
 
        ovl->OffsetHigh |= 0x80000000;
 
-       if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) )
+       if( !LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) ) {
+               fprintf(stderr, "unable to lock record zero %s, GetLastError = %d\n", name, GetLastError());
                return bt_close (bt), NULL;
+       }
 #endif 
 
 #ifdef unix
-       latchmgr = malloc (BT_maxpage);
+       latchmgr = valloc (BT_maxpage);
        *amt = 0;
 
        // read minimum page size to get root info
@@ -748,20 +839,22 @@ struct flock lock[1];
        if( size = lseek (bt->idx, 0L, 2) ) {
                if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
                        bits = latchmgr->alloc->bits;
-               else
+               else {
+                       fprintf(stderr, "Unable to read page zero\n");
                        return free(bt), free(latchmgr), NULL;
-       } else if( mode == BT_ro )
-               return free(latchmgr), bt_close (bt), NULL;
+               }
+       }
 #else
        latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
        size = GetFileSize(bt->idx, amt);
 
        if( size || *amt ) {
-               if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
+               if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) ) {
+                       fprintf(stderr, "Unable to read page zero\n");
                        return bt_close (bt), NULL;
-               bits = latchmgr->alloc->bits;
-       } else if( mode == BT_ro )
-               return bt_close (bt), NULL;
+               } else
+                       bits = latchmgr->alloc->bits;
+       }
 #endif
 
        bt->page_size = 1 << bits;
@@ -769,8 +862,15 @@ struct flock lock[1];
 
        bt->mode = mode;
 
-       if( size || *amt )
+       if( size || *amt ) {
+               nlatchpage = latchmgr->nlatchpage;
                goto btlatch;
+       }
+
+       if( nodemax < 16 ) {
+               fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
+               return bt_close(bt), NULL;
+       }
 
        // initialize an empty b-tree with latch page, root page, page of leaves
        // and page(s) of latches and page pool cache
@@ -790,110 +890,104 @@ struct flock lock[1];
        latchmgr->nlatchpage = nlatchpage;
        latchmgr->latchtotal = nodemax;
        latchmgr->latchhash = latchhash;
-#ifdef unix
-       if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
-               return bt_close (bt), NULL;
-#else
-       if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
-               return bt_close (bt), NULL;
 
-       if( *amt < bt->page_size )
+       if( bt_writepage (bt, latchmgr->alloc, 0) ) {
+               fprintf (stderr, "Unable to create btree page zero\n");
                return bt_close (bt), NULL;
-#endif
+       }
+
        memset (latchmgr, 0, 1 << bits);
        latchmgr->alloc->bits = bt->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
+               last = MIN_lvl - lvl;   // page number
                slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
-               bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
+               bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? last + 1 : 0);
                key = keyptr(latchmgr->alloc, 1);
                key->len = 2;           // create stopper key
                key->key[0] = 0xff;
                key->key[1] = 0xff;
+
                latchmgr->alloc->min = bt->page_size - 3;
                latchmgr->alloc->lvl = lvl;
                latchmgr->alloc->cnt = 1;
                latchmgr->alloc->act = 1;
-#ifdef unix
-               if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
-                       return bt_close (bt), NULL;
-#else
-               if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
-                       return bt_close (bt), NULL;
 
-               if( *amt < bt->page_size )
+               if( bt_writepage (bt, latchmgr->alloc, last) ) {
+                       fprintf (stderr, "Unable to create btree page %.8x\n", last);
                        return bt_close (bt), NULL;
-#endif
+               }
        }
 
-       // clear out latch manager pages
+       // clear out buffer pool pages
 
        memset(latchmgr, 0, bt->page_size);
-       last = MIN_lvl + 1;
+       last = MIN_lvl + nlatchpage;
 
-       while( last < ((MIN_lvl + 1 + nlatchpage) ) ) {
-               off = (uid)last << bt->page_bits;
+       if( bt_writepage (bt, latchmgr->alloc, last) ) {
+               fprintf (stderr, "Unable to write buffer pool page %.8x\n", last);
+               return bt_close (bt), NULL;
+       }
+               
 #ifdef unix
-               pwrite(bt->idx, latchmgr, bt->page_size, off);
+       free (latchmgr);
 #else
-               SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
-               if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
-                       return bt_close (bt), NULL;
-               if( *amt < bt->page_size )
-                       return bt_close (bt), NULL;
+       VirtualFree (latchmgr, 0, MEM_RELEASE);
 #endif
-               last++;
-       }
 
 btlatch:
 #ifdef unix
        lock->l_type = F_UNLCK;
-       if( fcntl (bt->idx, F_SETLK, lock) < 0 )
+       if( fcntl (bt->idx, F_SETLK, lock) < 0 ) {
+               fprintf (stderr, "Unable to unlock page zero\n");
                return bt_close (bt), NULL;
+       }
 #else
-       if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) )
+       if( !UnlockFileEx (bt->idx, 0, sizeof(struct BtPage_), 0, ovl) ) {
+               fprintf (stderr, "Unable to unlock page zero, GetLastError = %d\n", GetLastError());
                return bt_close (bt), NULL;
+       }
 #endif
 #ifdef unix
        flag = PROT_READ | PROT_WRITE;
        bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
-       if( bt->latchmgr == MAP_FAILED )
+       if( bt->latchmgr == MAP_FAILED ) {
+               fprintf (stderr, "Unable to mmap page zero, errno = %d", errno);
                return bt_close (bt), NULL;
-       bt->table = (void *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
-       if( bt->table == MAP_FAILED )
+       }
+       bt->table = (void *)mmap (0, (uid)nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
+       if( bt->table == MAP_FAILED ) {
+               fprintf (stderr, "Unable to mmap buffer pool, errno = %d", errno);
                return bt_close (bt), NULL;
+       }
+       madvise (bt->table, (uid)nlatchpage << bt->page_bits, MADV_RANDOM | MADV_WILLNEED);
 #else
        flag = PAGE_READWRITE;
-       bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size, NULL);
-       if( !bt->halloc )
+       bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size, NULL);
+       if( !bt->halloc ) {
+               fprintf (stderr, "Unable to create file mapping for buffer pool mgr, GetLastError = %d\n", GetLastError());
                return bt_close (bt), NULL;
+       }
 
        flag = FILE_MAP_WRITE;
-       bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (bt->latchmgr->nlatchpage + LATCH_page) * bt->page_size);
-       if( !bt->latchmgr )
-               return GetLastError(), bt_close (bt), NULL;
+       bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, ((uid)nlatchpage + LATCH_page) * bt->page_size);
+       if( !bt->latchmgr ) {
+               fprintf (stderr, "Unable to map buffer pool, GetLastError = %d\n", GetLastError());
+               return bt_close (bt), NULL;
+       }
 
        bt->table = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
 #endif
-       bt->latchpool = (unsigned char *)bt->table + (bt->latchmgr->nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
-       bt->latchsets = (BtLatchSet *)(bt->latchpool - bt->latchmgr->latchtotal * sizeof(BtLatchSet));
-
-#ifdef unix
-       free (latchmgr);
-#else
-       VirtualFree (latchmgr, 0, MEM_RELEASE);
-#endif
+       bt->pagepool = (unsigned char *)bt->table + (uid)(nlatchpage - bt->latchmgr->latchtotal) * bt->page_size;
+       bt->latchsets = (BtLatchSet *)(bt->pagepool - (uid)bt->latchmgr->latchtotal * sizeof(BtLatchSet));
 
 #ifdef unix
-       bt->mem = malloc (3 * bt->page_size);
+       bt->mem = valloc (2 * bt->page_size);
 #else
-       bt->mem = VirtualAlloc(NULL, 3 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
+       bt->mem = VirtualAlloc(NULL, 2 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
 #endif
        bt->frame = (BtPage)bt->mem;
        bt->cursor = (BtPage)(bt->mem + bt->page_size);
-       bt->zero = (BtPage)(bt->mem + 2 * bt->page_size);
-
-       memset (bt->zero, 0, bt->page_size);
        return bt;
 }
 
@@ -950,9 +1044,6 @@ uid bt_newpage(BtDb *bt, BtPage page)
 BtLatchSet *latch;
 uid new_page;
 BtPage temp;
-off64_t off;
-uint amt[1];
-int reuse;
 
        //      lock allocation page
 
@@ -971,29 +1062,19 @@ int reuse;
          bt_spinreleasewrite(bt->latchmgr->lock);
          memcpy (temp, page, bt->page_size);
 
-         if( bt_update (bt, temp, latch) )
-               return 0;
-
+         bt_update (bt, temp);
          bt_unpinlatch (latch);
+         return new_page;
        } else {
          new_page = bt_getid(bt->latchmgr->alloc->right);
          bt_putid(bt->latchmgr->alloc->right, new_page+1);
          bt_spinreleasewrite(bt->latchmgr->lock);
-         off = new_page << bt->page_bits;
-#ifdef unix
-         if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
-               return bt->err = BTERR_wrt, 0;
-#else
-         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
 
-         if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
-               return bt->err = BTERR_wrt, 0;
-
-         if( *amt <  bt->page_size )
-               return bt->err = BTERR_wrt, 0;
-#endif
+         if( bt_writepage (bt, page, new_page) )
+               return 0;
        }
 
+       bt_update (bt, bt->latchmgr->alloc);
        return new_page;
 }
 
@@ -1018,23 +1099,23 @@ int ans;
 
 //  Update current page of btree by
 //     flushing mapped area to disk backing of cache pool.
+//     mark page as dirty for rewrite to permanent location
 
-BTERR bt_update (BtDb *bt, BtPage page, BtLatchSet *latch)
+void bt_update (BtDb *bt, BtPage page)
 {
 #ifdef unix
        msync (page, bt->page_size, MS_ASYNC);
 #else
-       FlushViewOfFile (page, bt->page_size);
+//     FlushViewOfFile (page, bt->page_size);
 #endif
-       latch->dirty = 1;
-       return 0;
+       page->dirty = 1;
 }
 
 //  map the btree cached page onto current page
 
 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
 {
-       return (BtPage)((latch - bt->latchsets) * bt->page_size + bt->latchpool);
+       return (BtPage)((uid)(latch - bt->latchsets) * bt->page_size + bt->pagepool);
 }
 
 //     deallocate a deleted page 
@@ -1052,10 +1133,9 @@ BtPage page = bt_mappage (bt, latch);
        //      store chain in second right
        bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
        bt_putid(bt->latchmgr->alloc[1].right, page_no);
-       page->free = 1;
 
-       if( bt_update(bt, page, latch) )
-               return bt->err;
+       page->free = 1;
+       bt_update(bt, page);
 
        // unlock released page
 
@@ -1066,6 +1146,7 @@ BtPage page = bt_mappage (bt, latch);
        // unlock allocation page
 
        bt_spinreleasewrite (bt->latchmgr->lock);
+       bt_update (bt, bt->latchmgr->alloc);
        return 0;
 }
 
@@ -1208,14 +1289,12 @@ BtKey ptr;
        memcpy(rightkey, ptr, ptr->len + 1);
 
        memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
-       bt->page->dirty = 1;
+       bt->page->clean = 1;
 
        ptr = keyptr(bt->page, bt->page->cnt);
        memcpy(leftkey, ptr, ptr->len + 1);
 
-       if( bt_update (bt, bt->page, latch) )
-               return bt->err;
-
+       bt_update (bt, bt->page);
        bt_lockpage (BtLockParent, latch);
        bt_unlockpage (BtLockWrite, latch);
 
@@ -1263,8 +1342,7 @@ uint idx;
        bt_lockpage (BtLockWrite, latch);
        memcpy (root, temp, bt->page_size);
 
-       if( bt_update (bt, root, bt->latch) )
-               return bt->err;
+       bt_update (bt, root);
 
        if( bt_freepage (bt, child, latch) )
                return bt->err;
@@ -1302,7 +1380,7 @@ BtKey ptr;
        if( found = !keycmp (ptr, key, len) )
          if( found = slotptr(bt->page, slot)->dead == 0 ) {
                dirty = slotptr(bt->page,slot)->dead = 1;
-               bt->page->dirty = 1;
+               bt->page->clean = 1;
                bt->page->act--;
 
                // collapse empty slots
@@ -1346,8 +1424,7 @@ BtKey ptr;
        // return if page is not empty
 
        if( bt->page->act ) {
-         if( bt_update(bt, bt->page, latch) )
-               return bt->err;
+         bt_update(bt, bt->page);
          bt_unlockpage(BtLockWrite, latch);
          bt_unpinlatch (latch);
          return bt->found = found, 0;
@@ -1388,11 +1465,8 @@ BtKey ptr;
        bt_putid(temp->right, page_no);
        temp->kill = 1;
 
-       if( bt_update(bt, bt->page, latch) )
-               return bt->err;
-
-       if( bt_update(bt, temp, rlatch) )
-               return bt->err;
+       bt_update(bt, bt->page);
+       bt_update(bt, temp);
 
        bt_lockpage(BtLockParent, latch);
        bt_unlockpage(BtLockWrite, latch);
@@ -1471,7 +1545,7 @@ int ret;
 
        //      skip cleanup if nothing to reclaim
 
-       if( !page->dirty )
+       if( !page->clean )
                return 0;
 
        memcpy (bt->frame, page, bt->page_size);
@@ -1556,8 +1630,7 @@ uid right;
 
        // update and release root (bt->page)
 
-       if( bt_update(bt, root, bt->latch) )
-               return bt->err;
+       bt_update(bt, root);
 
        bt_unlockpage(BtLockWrite, bt->latch);
        bt_unpinlatch(bt->latch);
@@ -1624,7 +1697,7 @@ BtKey key;
        memcpy (bt->frame, page, bt->page_size);
        memset (page+1, 0, bt->page_size - sizeof(*page));
        nxt = bt->page_size;
-       page->dirty = 0;
+       page->clean = 0;
        page->act = 0;
        cnt = 0;
        idx = 0;
@@ -1666,8 +1739,7 @@ BtKey key;
 
        // update left (containing) node
 
-       if( bt_update(bt, page, latch) )
-               return bt->err;
+       bt_update(bt, page);
 
        bt_lockpage (BtLockParent, latch);
        bt_unlockpage (BtLockWrite, latch);
@@ -1721,8 +1793,7 @@ BtKey ptr;
          slotptr(page, slot)->tod = tod;
 #endif
          bt_putid(slotptr(page,slot)->id, id);
-         if( bt_update(bt, bt->page, bt->latch) )
-               return bt->err;
+         bt_update(bt, bt->page);
          bt_unlockpage(BtLockWrite, bt->latch);
          bt_unpinlatch (bt->latch);
          return 0;
@@ -1765,8 +1836,7 @@ BtKey ptr;
 #endif
   slotptr(page, slot)->dead = 0;
 
-  if( bt_update(bt, bt->page, bt->latch) )
-         return bt->err;
+  bt_update(bt, bt->page);
 
   bt_unlockpage(BtLockWrite, bt->latch);
   bt_unpinlatch(bt->latch);
@@ -1855,17 +1925,21 @@ uint bt_audit (BtDb *bt)
 uint idx, hashidx;
 uid next, page_no;
 BtLatchSet *latch;
+uint blks[64];
 uint cnt = 0;
 BtPage page;
-off64_t off;
 uint amt[1];
 BtKey ptr;
 
 #ifdef unix
+       posix_fadvise( bt->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
+#endif
        if( *(ushort *)(bt->latchmgr->lock) )
                fprintf(stderr, "Alloc page locked\n");
        *(ushort *)(bt->latchmgr->lock) = 0;
 
+       memset (blks, 0, sizeof(blks));
+
        for( idx = 1; idx <= bt->latchmgr->latchdeployed; idx++ ) {
                latch = bt->latchsets + idx;
                if( *(ushort *)latch->readwr )
@@ -1880,48 +1954,36 @@ BtKey ptr;
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
                *(ushort *)latch->parent = 0;
 
-               if( latch->pin ) {
+               if( latch->pin & PIN_mask ) {
                        fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
                        latch->pin = 0;
                }
-               page = (BtPage)(idx * bt->page_size + bt->latchpool);
-               off = latch->page_no << bt->page_bits;
-#ifdef unix
-           if( latch->dirty )
-                if( pwrite(bt->idx, page, bt->page_size, off) < bt->page_size )
-                       fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
-#else
-           if( latch->dirty ) {
-                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
-
-                 if( !WriteFile(bt->idx, page, bt->page_size, amt, NULL) )
-                       fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
+               page = (BtPage)((uid)idx * bt->page_size + bt->pagepool);
+               blks[page->lvl]++;
 
-                 if( *amt <  bt->page_size )
+           if( page->dirty )
+                if( bt_writepage (bt, page, latch->page_no) )
                        fprintf(stderr, "Page %.8x Write Error\n", latch->page_no);
-           }
-#endif
-           latch->dirty = 0;
        }
 
+       for( idx = 0; blks[idx]; idx++ )
+               fprintf(stderr, "cache: %d lvl %d blocks\n", blks[idx], idx);
+
        for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
          if( *(ushort *)(bt->table[hashidx].latch) )
                        fprintf(stderr, "hash entry %d locked\n", hashidx);
 
          *(ushort *)(bt->table[hashidx].latch) = 0;
-
-         if( idx = bt->table[hashidx].slot ) do {
-               latch = bt->latchsets + idx;
-               if( latch->pin )
-                       fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
-         } while( idx = latch->next );
        }
 
+       memset (blks, 0, sizeof(blks));
+
        next = bt->latchmgr->nlatchpage + LATCH_page;
        page_no = LEAF_page;
 
        while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
-               pread (bt->idx, bt->frame, bt->page_size, page_no << bt->page_bits);
+               if( bt_readpage (bt, bt->frame, page_no) )
+                 fprintf(stderr, "page %.8x unreadable\n", page_no);
                if( !bt->frame->free ) {
                 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
                  ptr = keyptr(bt->frame, idx+1);
@@ -1930,16 +1992,18 @@ BtKey ptr;
                 }
                 if( !bt->frame->lvl )
                        cnt += bt->frame->act;
+                blks[bt->frame->lvl]++;
                }
 
                if( page_no > LEAF_page )
                        next = page_no + 1;
                page_no = next;
        }
+
+       for( idx = 0; blks[idx]; idx++ )
+               fprintf(stderr, "btree: %d lvl %d blocks\n", blks[idx], idx);
+
        return cnt - 1;
-#else
-       return 0;
-#endif
 }
 
 #ifndef unix
@@ -2013,15 +2077,20 @@ int ch, cnt = 0, bits = 12, idx;
 unsigned char key[256];
 double done, start;
 uid next, page_no;
+BtLatchSet *latch;
 float elapsed;
 time_t tod[1];
 uint scan = 0;
 uint len = 0;
 uint map = 0;
+BtPage page;
 BtKey ptr;
 BtDb *bt;
 FILE *in;
 
+#ifdef WIN32
+       _setmode (1, _O_BINARY);
+#endif
        if( argc < 4 ) {
                fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find/Count [page_bits mapped_pool_pages start_line_number]\n", argv[0]);
                fprintf (stderr, "  page_bits: size of btree page in bits\n");
@@ -2050,15 +2119,27 @@ FILE *in;
 
        switch(argv[3][0]| 0x20)
        {
-       case 'a':
-               fprintf(stderr, "started audit for %s\n", argv[2]);
+       case 'p':       // display page
+               if( latch = bt_pinlatch (bt, off) )
+                       page = bt_mappage (bt, latch);
+               else
+                       fprintf(stderr, "unable to read page %.8x\n", off);
+
+               write (1, page, bt->page_size);
+               break;
+
+       case 'a':       // buffer pool audit
+               fprintf(stderr, "started audit for %s\n", argv[1]);
                cnt = bt_audit (bt);
-               fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt);
+               fprintf(stderr, "finished audit for %s, %d keys\n", argv[1], cnt);
                break;
 
-       case 'w':
+       case 'w':       // write keys
                fprintf(stderr, "started indexing for %s\n", argv[2]);
-               if( argc > 2 && (in = fopen (argv[2], "rb")) )
+               if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
+#ifdef unix
+                 posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
+#endif
                  while( ch = getc(in), ch != EOF )
                        if( ch == '\n' )
                        {
@@ -2071,12 +2152,16 @@ FILE *in;
                        }
                        else if( len < 245 )
                                key[len++] = ch;
+                 }
                fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
                break;
 
-       case 'd':
+       case 'd':       // delete keys
                fprintf(stderr, "started deleting keys for %s\n", argv[2]);
-               if( argc > 2 && (in = fopen (argv[2], "rb")) )
+               if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
+#ifdef unix
+                 posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
+#endif
                  while( ch = getc(in), ch != EOF )
                        if( ch == '\n' )
                        {
@@ -2089,12 +2174,16 @@ FILE *in;
                        }
                        else if( len < 245 )
                                key[len++] = ch;
+                 }
                fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
                break;
 
-       case 'f':
+       case 'f':       // find keys
                fprintf(stderr, "started finding keys for %s\n", argv[2]);
-               if( argc > 2 && (in = fopen (argv[2], "rb")) )
+               if( argc > 2 && (in = fopen (argv[2], "rb")) ) {
+#ifdef unix
+                 posix_fadvise( fileno(in), 0, 0, POSIX_FADV_NOREUSE);
+#endif
                  while( ch = getc(in), ch != EOF )
                        if( ch == '\n' )
                        {
@@ -2109,10 +2198,11 @@ FILE *in;
                        }
                        else if( len < 245 )
                                key[len++] = ch;
+                 }
                fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
                break;
 
-       case 's':
+       case 's':       // scan and print keys
                fprintf(stderr, "started scaning\n");
                cnt = len = key[0] = 0;
 
@@ -2131,7 +2221,7 @@ FILE *in;
                fprintf(stderr, " Total keys read %d\n", cnt - 1);
                break;
 
-       case 'c':
+       case 'c':       // count keys
          fprintf(stderr, "started counting\n");
          cnt = 0;
 
@@ -2139,8 +2229,6 @@ FILE *in;
          page_no = LEAF_page;
 
          while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
-         BtLatchSet *latch;
-         BtPage page;
                if( latch = bt_pinlatch (bt, page_no) )
                        page = bt_mappage (bt, latch);
                if( !page->free && !page->lvl )