]> pd.if.org Git - btree/blobdiff - threadskv5.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv5.c
index 49047e71cec6318c7980b4cc81e3c5e731484eb7..ec46ace3a3f7738d261fe9587ecf1ae7df7940d9 100644 (file)
@@ -5,7 +5,7 @@
 //     duplicate key management
 //     and bi-directional cursors
 
-// 08 SEP 2014
+// 09 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -144,7 +144,7 @@ typedef struct {
        volatile uid page_no;           // latch set page number
 } BtLatchSet;
 
-//     Define the length of the page and key pointers
+//     Define the length of the page record numbers
 
 #define BtId 6
 
@@ -184,17 +184,20 @@ typedef struct {
 //     bytes.
 
 typedef struct {
-       unsigned char len;
-       unsigned char key[1];
-} *BtKey;
+       unsigned char len;              // this can be changed to a ushort or uint
+       unsigned char key[0];
+} BtKey;
 
 //     the value structure also occupies space at the upper
 //     end of the page. Each key is immediately followed by a value.
 
 typedef struct {
-       unsigned char len;
-       unsigned char value[1];
-} *BtVal;
+       unsigned char len;              // this can be changed to a ushort or uint
+       unsigned char value[0];
+} BtVal;
+
+#define BT_maxkey      255             // maximum number of bytes in a key
+#define BT_keyarray (BT_maxkey + sizeof(BtKey))
 
 //     The first part of an index page.
 //     It is immediately followed
@@ -214,7 +217,7 @@ typedef struct BtPage_ {
        unsigned char lvl:7;            // level of page
        unsigned char kill:1;           // page is being deleted
        unsigned char left[BtId];       // page number to left
-       unsigend char filler[2];        // padding to multiple of 8
+       unsigned char filler[2];        // padding to multiple of 8
        unsigned char right[BtId];      // page number to right
 } *BtPage;
 
@@ -289,9 +292,9 @@ typedef struct {
        BtMgr *mgr;                             // buffer manager for thread
        BtPage cursor;                  // cached frame for start/next (never mapped)
        BtPage frame;                   // spare frame for the page split (never mapped)
-       uid cursor_page;                        // current cursor page number   
-       unsigned char *mem;             // frame, cursor, page memory buffer
-       unsigned char key[256]; // last found complete key
+       uid cursor_page;                                // current cursor page number   
+       unsigned char *mem;                             // frame, cursor, page memory buffer
+       unsigned char key[BT_keyarray]; // last found complete key
        int found;                              // last delete or insert was found
        int err;                                // last error
 } BtDb;
@@ -312,7 +315,7 @@ extern BtDb *bt_open (BtMgr *mgr);
 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
-extern BtKey bt_foundkey (BtDb *bt);
+extern BtKey *bt_foundkey (BtDb *bt);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
@@ -323,8 +326,8 @@ void bt_mgrclose (BtMgr *mgr);
 //  Helper functions to return slot values
 //     from the cursor page.
 
-extern BtKey bt_key (BtDb *bt, uint slot);
-extern BtVal bt_val (BtDb *bt, uint slot);
+extern BtKey *bt_key (BtDb *bt, uint slot);
+extern BtVal *bt_val (BtDb *bt, uint slot);
 
 //  BTree page number constants
 #define ALLOC_page             0       // allocation & latch manager hash table
@@ -388,8 +391,8 @@ extern BtVal bt_val (BtDb *bt, uint slot);
 //     Page slots use 1 based indexing.
 
 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
-#define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
-#define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
+#define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
+#define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
 
 void bt_putid(unsigned char *dest, uid id)
 {
@@ -839,13 +842,13 @@ BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uin
 uint lvl, attr, cacheblk, last, slot, idx;
 uint nlatchpage, latchhash;
 unsigned char value[BtId];
+int flag, initit = 0;
 BtLatchMgr *latchmgr;
 off64_t size;
 uint amt[1];
 BtMgr* mgr;
-BtKey key;
-BtVal val;
-int flag;
+BtKey* key;
+BtVal *val;
 
 #ifndef unix
 SYSTEM_INFO sysinfo[1];
@@ -889,14 +892,21 @@ SYSTEM_INFO sysinfo[1];
        *amt = 0;
 
        // read minimum page size to get root info
+       //      to support raw disk partition files
+       //      check if bits == 0 on the disk.
 
        if( size = lseek (mgr->idx, 0L, 2) ) {
                if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
-                       bits = latchmgr->alloc->bits;
+                       if( latchmgr->alloc->bits )
+                               bits = latchmgr->alloc->bits;
+                       else
+                               initit = 1;
                else
                        return free(mgr), free(latchmgr), NULL;
        } else if( mode == BT_ro )
                return free(latchmgr), bt_mgrclose (mgr), NULL;
+       else
+               initit = 1;
 #else
        latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
        size = GetFileSize(mgr->idx, amt);
@@ -907,6 +917,8 @@ SYSTEM_INFO sysinfo[1];
                bits = latchmgr->alloc->bits;
        } else if( mode == BT_ro )
                return bt_mgrclose (mgr), NULL;
+       else
+               initit = 1;
 #endif
 
        mgr->page_size = 1 << bits;
@@ -944,7 +956,7 @@ SYSTEM_INFO sysinfo[1];
        mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
 #endif
 
-       if( size || *amt )
+       if( !initit )
                goto mgrlatch;
 
        // initialize an empty b-tree with latch page, root page, page of leaves
@@ -969,8 +981,13 @@ SYSTEM_INFO sysinfo[1];
 
        latchmgr->latchhash = latchhash;
 
+       //  initialize left-most LEAF page in
+       //      alloc->left.
+
+       bt_putid (latchmgr->alloc->left, LEAF_page);
+
 #ifdef unix
-       if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
+       if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
                return bt_mgrclose (mgr), NULL;
 #else
        if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
@@ -984,7 +1001,7 @@ SYSTEM_INFO sysinfo[1];
        latchmgr->alloc->bits = mgr->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
-               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
+               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
                key = keyptr(latchmgr->alloc, 1);
                key->len = 2;           // create stopper key
                key->key[0] = 0xff;
@@ -1000,7 +1017,7 @@ SYSTEM_INFO sysinfo[1];
                latchmgr->alloc->cnt = 1;
                latchmgr->alloc->act = 1;
 #ifdef unix
-               if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
+               if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
                        return bt_mgrclose (mgr), NULL;
 #else
                if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
@@ -1088,7 +1105,7 @@ BtDb *bt = malloc (sizeof(*bt));
 //  compare two keys, returning > 0, = 0, or < 0
 //  as the comparison value
 
-int keycmp (BtKey key1, unsigned char *key2, uint len2)
+int keycmp (BtKey* key1, unsigned char *key2, uint len2)
 {
 uint len1 = key1->len;
 int ans;
@@ -1190,7 +1207,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-//     madvise (page, bt->mgr->page_size, MADV_WILLNEED);
        return page;
 }
 
@@ -1378,6 +1394,7 @@ void bt_unlockpage(BtLock mode, BtLatchSet *set)
 }
 
 //     allocate a new page
+//     return with page pinned.
 
 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
 {
@@ -1588,30 +1605,21 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
 {
-unsigned char leftkey[256], rightkey[256];
+unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
 unsigned char value[BtId];
-BtKey ptr;
+BtKey* ptr;
 uint idx;
 
-       // collapse empty slots beneath our slot
-
-       while( idx = set->page->cnt - 1 )
-         if( slotptr(set->page, idx)->dead ) {
-               *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
-               memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
-         } else
-               break;
-
        //      remove the old fence value
 
        ptr = keyptr(set->page, set->page->cnt);
-       memcpy (rightkey, ptr, ptr->len + 1);
+       memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
        memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
 
        //  cache new fence value
 
        ptr = keyptr(set->page, set->page->cnt);
-       memcpy (leftkey, ptr, ptr->len + 1);
+       memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
 
        bt_lockpage (BtLockParent, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
@@ -1619,13 +1627,16 @@ uint idx;
        //      insert new (now smaller) fence key
 
        bt_putid (value, set->page_no);
+       ptr = (BtKey*)leftkey;
 
-       if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
          return bt->err;
 
        //      now delete old fence key
 
-       if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
+       ptr = (BtKey*)rightkey;
+
+       if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
                return bt->err;
 
        bt_unlockpage (BtLockParent, set->latch);
@@ -1698,6 +1709,7 @@ BtPageSet right2[1];
        bt_putid(right2->page->left, left_page_no);
        bt_unlockpage (BtLockWrite, right2->latch);
        bt_unpinlatch (right2->latch);
+       bt_unpinpool (right2->pool);
        return 0;
 }
 
@@ -1706,12 +1718,12 @@ BtPageSet right2[1];
 
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
 {
-unsigned char lowerfence[256], higherfence[256];
-BtPageSet set[1], right[1], right2[1];
+unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
+uint slot, idx, found, fence;
+BtPageSet set[1], right[1];
 unsigned char value[BtId];
-uint slot, idx, found;
-BtKey ptr, tst;
-BtVal val;
+BtKey *ptr, *tst;
+BtVal *val;
 
        if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
                ptr = keyptr(set->page, slot);
@@ -1723,19 +1735,30 @@ BtVal val;
        if( slotptr(set->page, slot)->type == Librarian )
                ptr = keyptr(set->page, ++slot);
 
+       fence = slot == set->page->cnt;
+
        // if key is found delete it, otherwise ignore request
 
        if( found = !keycmp (ptr, key, len) )
          if( found = slotptr(set->page, slot)->dead == 0 ) {
                val = valptr(set->page,slot);
                slotptr(set->page, slot)->dead = 1;
-               set->page->garbage += ptr->len + val->len + 2;
+               set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
                set->page->act--;
+
+               // collapse empty slots beneath the fence
+
+               while( idx = set->page->cnt - 1 )
+                 if( slotptr(set->page, idx)->dead ) {
+                       *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
+                       memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
+                 } else
+                       break;
          }
 
        //      did we delete a fence key in an upper level?
 
-       if( found && lvl && set->page->act && slot == set->page->cnt )
+       if( found && lvl && set->page->act && fence )
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
@@ -1762,7 +1785,7 @@ BtVal val;
        //      to post in parent
 
        ptr = keyptr(set->page, set->page->cnt);
-       memcpy (lowerfence, ptr, ptr->len + 1);
+       memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
 
        //      obtain lock on right page
 
@@ -1797,7 +1820,7 @@ BtVal val;
        // cache copy of key to update
 
        ptr = keyptr(right->page, right->page->cnt);
-       memcpy (higherfence, ptr, ptr->len + 1);
+       memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
 
        // mark right page deleted and point it to left page
        //      until we can post parent updates
@@ -1814,13 +1837,16 @@ BtVal val;
        // redirect higher key directly to our new node contents
 
        bt_putid (value, set->page_no);
+       ptr = (BtKey*)higherfence;
 
-       if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
          return bt->err;
 
        //      delete old lower key to our node
 
-       if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
+       ptr = (BtKey*)lowerfence;
+
+       if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
          return bt->err;
 
        //      obtain delete and write locks to right node
@@ -1837,9 +1863,9 @@ BtVal val;
        return 0;
 }
 
-BtKey bt_foundkey (BtDb *bt)
+BtKey *bt_foundkey (BtDb *bt)
 {
-       return (BtKey)(bt->key);
+       return (BtKey*)(bt->key);
 }
 
 //     advance to next slot
@@ -1892,8 +1918,8 @@ int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value,
 BtPageSet set[1];
 uint len, slot;
 int ret = -1;
-BtKey ptr;
-BtVal val;
+BtKey *ptr;
+BtVal *val;
 
   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
    do {
@@ -1906,12 +1932,18 @@ BtVal val;
 
        //      return actual key found
 
-       memcpy (bt->key, ptr, ptr->len + 1);
+       memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
        len = ptr->len;
 
        if( slotptr(set->page, slot)->type == Duplicate )
                len -= BtId;
 
+       //      not there if we reach the stopper key
+
+       if( slot == set->page->cnt )
+         if( !bt_getid (set->page->right) )
+               break;
+
        // if key exists, return >= 0 value bytes copied
        //      otherwise return (-1)
 
@@ -1948,10 +1980,10 @@ uint nxt = bt->mgr->page_size;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
 uint newslot = max;
-BtKey key;
-BtVal val;
+BtKey *key;
+BtVal *val;
 
-       if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
+       if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
                return slot;
 
        //      skip cleanup and proceed to split
@@ -1975,29 +2007,28 @@ BtVal val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || page->lvl )
+                 if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
 
                val = valptr(bt->frame, cnt);
-               nxt -= val->len + 1;
-               ((unsigned char *)page)[nxt] = val->len;
-               memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
+               nxt -= val->len + sizeof(BtVal);
+               memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
 
                // copy the key across
 
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               nxt -= key->len + sizeof(BtKey);
+               memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -2013,7 +2044,7 @@ BtVal val;
 
        //      see if page has enough space now, or does it need splitting?
 
-       if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
+       if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
                return newslot;
 
        return 0;
@@ -2021,11 +2052,13 @@ BtVal val;
 
 // split the root and raise the height of the btree
 
-BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, BtPageSet *right)
+BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
 {
 uint nxt = bt->mgr->page_size;
 unsigned char value[BtId];
 BtPageSet left[1];
+BtKey *ptr;
+BtVal *val;
 
        //  Obtain an empty page to use, and copy the current
        //  root contents into it, e.g. lower keys
@@ -2046,30 +2079,34 @@ BtPageSet left[1];
 
        memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
 
+       // insert stopper key at top of newroot page
+       // and increase the root height
+
+       nxt -= BtId + sizeof(BtVal);
+       bt_putid (value, right->page_no);
+       val = (BtVal *)((unsigned char *)root->page + nxt);
+       memcpy (val->value, value, BtId);
+       val->len = BtId;
+
+       nxt -= 2 + sizeof(BtKey);
+       slotptr(root->page, 2)->off = nxt;
+       ptr = (BtKey *)((unsigned char *)root->page + nxt);
+       ptr->len = 2;
+       ptr->key[0] = 0xff;
+       ptr->key[1] = 0xff;
+
        // insert lower keys page fence key on newroot page as first key
 
-       nxt -= BtId + 1;
+       nxt -= BtId + sizeof(BtVal);
        bt_putid (value, left->page_no);
-       ((unsigned char *)root->page)[nxt] = BtId;
-       memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
+       val = (BtVal *)((unsigned char *)root->page + nxt);
+       memcpy (val->value, value, BtId);
+       val->len = BtId;
 
-       nxt -= *leftkey + 1;
-       memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
+       nxt -= leftkey->len + sizeof(BtKey);
        slotptr(root->page, 1)->off = nxt;
+       memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
        
-       // insert stopper key on newroot page
-       // and increase the root height
-
-       nxt -= 3 + BtId + 1;
-       ((unsigned char *)root->page)[nxt] = 2;
-       ((unsigned char *)root->page)[nxt+1] = 0xff;
-       ((unsigned char *)root->page)[nxt+2] = 0xff;
-
-       bt_putid (value, right->page_no);
-       ((unsigned char *)root->page)[nxt+3] = BtId;
-       memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
-       slotptr(root->page, 2)->off = nxt;
-
        bt_putid(root->page->right, 0);
        root->page->min = nxt;          // reset lowest used offset and key count
        root->page->cnt = 2;
@@ -2089,15 +2126,15 @@ BtPageSet left[1];
 
 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
+unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
-unsigned char fencekey[256], rightkey[256];
 unsigned char value[BtId];
 uint lvl = set->page->lvl;
 BtPageSet right[1];
+BtKey *key, *ptr;
+BtVal *val, *src;
 uid right2;
 uint prev;
-BtKey key;
-BtVal val;
 
        //  split higher half of keys to bt->frame
 
@@ -2107,24 +2144,22 @@ BtVal val;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
-               val = valptr(set->page, cnt);
-               nxt -= val->len + 1;
-               ((unsigned char *)bt->frame)[nxt] = val->len;
-               memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
+               src = valptr(set->page, cnt);
+               nxt -= src->len + sizeof(BtVal);
+               memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
 
                key = keyptr(set->page, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
+               nxt -= key->len + sizeof(BtKey);
+               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + sizeof(BtVal));
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -2137,7 +2172,7 @@ BtVal val;
 
        // remember existing fence key for new page to the right
 
-       memcpy (rightkey, key, key->len + 1);
+       memcpy (rightkey, key, key->len + sizeof(BtKey));
 
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
@@ -2163,7 +2198,6 @@ BtVal val;
                return bt->err;
 
        memcpy (right->page, bt->frame, bt->mgr->page_size);
-       bt_unpinpool (right->pool);
 
        //      update lower keys to continue in old page
 
@@ -2184,22 +2218,19 @@ BtVal val;
        while( cnt++ < max ) {
                if( slotptr(bt->frame, cnt)->dead )
                        continue;
-               val = valptr(bt->frame, cnt);
-               nxt -= val->len + 1;
-               ((unsigned char *)set->page)[nxt] = val->len;
-               memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
+               src = valptr(bt->frame, cnt);
+               nxt -= src->len + sizeof(BtVal);
+               memcpy ((unsigned char *)set->page + nxt, src, src->len + sizeof(BtVal));
 
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
+               nxt -= key->len + sizeof(BtKey);
+               memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
@@ -2210,7 +2241,7 @@ BtVal val;
 
        // remember fence key for smaller page
 
-       memcpy(fencekey, key, key->len + 1);
+       memcpy(fencekey, key, key->len + sizeof(BtKey));
 
        bt_putid(set->page->right, right->page_no);
        set->page->min = nxt;
@@ -2219,7 +2250,7 @@ BtVal val;
        // if current page is the root page, split it
 
        if( set->page_no == ROOT_page )
-               return bt_splitroot (bt, set, fencekey, right);
+               return bt_splitroot (bt, set, (BtKey *)fencekey, right);
 
        // insert new fences in their parent pages
 
@@ -2232,15 +2263,17 @@ BtVal val;
        // insert new fence for reformulated left block of smaller keys
 
        bt_putid (value, set->page_no);
+       ptr = (BtKey*)fencekey;
 
-       if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
                return bt->err;
 
        // switch fence for right block of larger keys to new right page
 
        bt_putid (value, right->page_no);
+       ptr = (BtKey*)rightkey;
 
-       if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
                return bt->err;
 
        bt_unlockpage (BtLockParent, set->latch);
@@ -2249,6 +2282,7 @@ BtVal val;
 
        bt_unlockpage (BtLockParent, right->latch);
        bt_unpinlatch (right->latch);
+       bt_unpinpool (right->pool);
        return 0;
 }
 
@@ -2260,6 +2294,8 @@ BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uin
 {
 uint idx, librarian;
 BtSlot *node;
+BtKey *ptr;
+BtVal *val;
 
        //      if found slot > desired slot and previous slot
        //      is a librarian slot, use it
@@ -2270,16 +2306,18 @@ BtSlot *node;
 
        // copy value onto page
 
-       set->page->min -= vallen + 1; // reset lowest used offset
-       ((unsigned char *)set->page)[set->page->min] = vallen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
+       set->page->min -= vallen + sizeof(BtVal);
+       val = (BtVal*)((unsigned char *)set->page + set->page->min);
+       memcpy (val->value, value, vallen);
+       val->len = vallen;
 
        // copy key onto page
 
-       set->page->min -= keylen + 1; // reset lowest used offset
-       ((unsigned char *)set->page)[set->page->min] = keylen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
-
+       set->page->min -= keylen + sizeof(BtKey);
+       ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+       memcpy (ptr->key, key, keylen);
+       ptr->len = keylen;
+       
        //      find first empty slot
 
        for( idx = slot; idx < set->page->cnt; idx++ )
@@ -2325,18 +2363,19 @@ BtSlot *node;
 
 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
 {
-unsigned char newkey[256];
+unsigned char newkey[BT_keyarray];
 uint slot, idx, len;
 BtPageSet set[1];
+BtKey *ptr, *ins;
 uid sequence;
-BtKey ptr;
-BtVal val;
+BtVal *val;
 uint type;
 
   // set up the key we're working on
 
-  memcpy (newkey + 1, key, keylen);
-  newkey[0] = keylen;
+  ins = (BtKey*)newkey;
+  memcpy (ins->key, key, keylen);
+  ins->len = keylen;
 
   // is this a non-unique index value?
 
@@ -2345,12 +2384,12 @@ uint type;
   else {
        type = Duplicate;
        sequence = bt_newdup (bt);
-       bt_putid (newkey + *newkey + 1, sequence);
-       *newkey += BtId;
+       bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
+       ins->len += BtId;
   }
 
   while( 1 ) { // find the page and slot for the current key
-       if( slot = bt_loadpage (bt, set, newkey + 1, *newkey, lvl, BtLockWrite) )
+       if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
                ptr = keyptr(set->page, slot);
        else {
                if( !bt->err )
@@ -2373,19 +2412,21 @@ uint type;
        //      check for adequate space on the page
        //      and insert the new key before slot.
 
-       if( unique && (len != *newkey || memcmp (ptr->key, newkey+1, *newkey)) || !unique ) {
-         if( !(slot = bt_cleanpage (bt, set->page, *newkey, slot, vallen)) )
+       if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
+         if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
                if( bt_splitpage (bt, set) )
                  return bt->err;
                else
                  continue;
 
-         return bt_insertslot (bt, set, slot, newkey + 1, *newkey, value, vallen, type);
+         return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
        }
 
        // if key already exists, update value and return
 
-       if( val = valptr(set->page, slot), val->len >= vallen ) {
+       val = valptr(set->page, slot);
+
+       if( val->len >= vallen ) {
                if( slotptr(set->page, slot)->dead )
                        set->page->act++;
                set->page->garbage += val->len - vallen;
@@ -2401,7 +2442,7 @@ uint type;
        //      new update value doesn't fit in existing value area
 
        if( !slotptr(set->page, slot)->dead )
-               set->page->garbage += val->len + ptr->len + 2;
+               set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
        else {
                slotptr(set->page, slot)->dead = 0;
                set->page->act++;
@@ -2413,13 +2454,15 @@ uint type;
          else
                continue;
 
-       set->page->min -= vallen + 1;
-       ((unsigned char *)set->page)[set->page->min] = vallen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
+       set->page->min -= vallen + sizeof(BtVal);
+       val = (BtVal*)((unsigned char *)set->page + set->page->min);
+       memcpy (val->value, value, vallen);
+       val->len = vallen;
 
-       set->page->min -= keylen + 1;
-       ((unsigned char *)set->page)[set->page->min] = keylen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
+       set->page->min -= keylen + sizeof(BtKey);
+       ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+       memcpy (ptr->key, key, keylen);
+       ptr->len = keylen;
        
        slotptr(set->page, slot)->off = set->page->min;
        bt_unlockpage(BtLockWrite, set->latch);
@@ -2553,12 +2596,12 @@ uint slot;
        return slot;
 }
 
-BtKey bt_key(BtDb *bt, uint slot)
+BtKey *bt_key(BtDb *bt, uint slot)
 {
        return keyptr(bt->cursor, slot);
 }
 
-BtVal bt_val(BtDb *bt, uint slot)
+BtVal *bt_val(BtDb *bt, uint slot)
 {
        return valptr(bt->cursor,slot);
 }
@@ -2626,13 +2669,25 @@ struct timeval tv[1];
 }
 #endif
 
+void bt_poolaudit (BtMgr *mgr)
+{
+uint slot = 0;
+BtPool *pool;
+
+       while( slot++ < mgr->poolcnt ) {
+               pool = mgr->pool + slot;
+               if( pool->pin & ~CLOCK_bit )
+                 fprintf(stderr, "pool slot %d pinned\n", slot);
+       }
+}
+
 uint bt_latchaudit (BtDb *bt)
 {
 ushort idx, hashidx;
 uid next, page_no;
 BtLatchSet *latch;
 uint cnt = 0;
-BtKey ptr;
+BtKey *ptr;
 
 #ifdef unix
        posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
@@ -2698,7 +2753,7 @@ BtKey ptr;
                if( !bt->frame->free ) {
                 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
                  ptr = keyptr(bt->frame, idx+1);
-                 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
+                 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) > 0 )
                        fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
                 }
                 if( !bt->frame->lvl )
@@ -2728,18 +2783,21 @@ void *index_file (void *arg)
 uint __stdcall index_file (void *arg)
 #endif
 {
-int line = 0, found = 0, cnt = 0;
+int line = 0, found = 0, cnt = 0, unique;
 uid next, page_no = LEAF_page; // start on first page of leaves
-unsigned char key[256];
+unsigned char key[BT_maxkey];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
 BtPageSet set[1];
-BtKey ptr;
+BtKey *ptr;
+BtVal *val;
 BtDb *bt;
 FILE *in;
 
        bt = bt_open (args->mgr);
 
+       unique = (args->type[1] | 0x20) != 'd';
+
        switch(args->type[0] | 0x20)
        {
        case 'a':
@@ -2756,11 +2814,11 @@ FILE *in;
                        {
                          line++;
 
-                         if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
+                         if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
                fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
                break;
@@ -2779,11 +2837,11 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
+                         if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
                fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
                break;
@@ -2803,15 +2861,16 @@ FILE *in;
 
                          if( bt_findkey (bt, key, len, NULL, 0) < 0 )
                                fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
-                         ptr = (BtKey)(bt->key);
+                         ptr = (BtKey*)(bt->key);
                          found++;
 
                          if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
+
                fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
                break;
 
@@ -2834,7 +2893,7 @@ FILE *in;
                                fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
                fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
                break;
@@ -2846,6 +2905,7 @@ FILE *in;
                                set->page = bt_page (bt, set->pool, page_no);
                        else
                                break;
+                       madvise (set->page, bt->mgr->page_size, MADV_WILLNEED);
                        set->latch = bt_pinlatch (bt, page_no);
                        bt_lockpage (BtLockRead, set->latch);
                        next = bt_getid (set->page->right);
@@ -2860,6 +2920,8 @@ FILE *in;
                                        len -= BtId;
 
                                fwrite (ptr->key, len, 1, stdout);
+                               val = valptr(set->page, slot);
+                               fwrite (val->value, val->len, 1, stdout);
                                fputc ('\n', stdout);
                                cnt++;
                           }
@@ -2886,6 +2948,8 @@ FILE *in;
                                len -= BtId;
 
                        fwrite (ptr->key, len, 1, stdout);
+                       val = valptr(bt->cursor, slot);
+                       fwrite (val->value, val->len, 1, stdout);
                        fputc ('\n', stdout);
                        cnt++;
                  }
@@ -2954,7 +3018,7 @@ float elapsed;
 int num = 0;
 char key[1];
 BtMgr *mgr;
-BtKey ptr;
+BtKey *ptr;
 BtDb *bt;
 
        if( argc < 3 ) {
@@ -3039,6 +3103,7 @@ BtDb *bt;
        elapsed = getCpuTime(2);
        fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
 
+       bt_poolaudit (mgr);
        bt_mgrclose (mgr);
 }