]> pd.if.org Git - btree/commitdiff
Threadskv5.c -- configure key & value length fields to ushorts and uints.
authorunknown <karl@E04.petzent.com>
Tue, 9 Sep 2014 19:53:53 +0000 (12:53 -0700)
committerunknown <karl@E04.petzent.com>
Tue, 9 Sep 2014 19:53:53 +0000 (12:53 -0700)
Support for raw disk devices as btrees on linux.

threadskv5.c

index 83bfa1b532da4486bcd998a63aefa3a4843a1410..b5df26d617fe1e242a86a87a593163ae94878553 100644 (file)
@@ -5,7 +5,7 @@
 //     duplicate key management
 //     and bi-directional cursors
 
-// 08 SEP 2014
+// 09 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -144,7 +144,7 @@ typedef struct {
        volatile uid page_no;           // latch set page number
 } BtLatchSet;
 
-//     Define the length of the page and key pointers
+//     Define the length of the page record numbers
 
 #define BtId 6
 
@@ -184,17 +184,20 @@ typedef struct {
 //     bytes.
 
 typedef struct {
-       unsigned char len;
-       unsigned char key[1];
-} *BtKey;
+       unsigned char len;              // this can be changed to a ushort or uint
+       unsigned char key[0];
+} BtKey;
 
 //     the value structure also occupies space at the upper
 //     end of the page. Each key is immediately followed by a value.
 
 typedef struct {
-       unsigned char len;
-       unsigned char value[1];
-} *BtVal;
+       unsigned char len;              // this can be changed to a ushort or uint
+       unsigned char value[0];
+} BtVal;
+
+#define BT_maxkey      255             // maximum number of bytes in a key
+#define BT_keyarray (BT_maxkey + sizeof(BtKey))
 
 //     The first part of an index page.
 //     It is immediately followed
@@ -289,9 +292,9 @@ typedef struct {
        BtMgr *mgr;                             // buffer manager for thread
        BtPage cursor;                  // cached frame for start/next (never mapped)
        BtPage frame;                   // spare frame for the page split (never mapped)
-       uid cursor_page;                        // current cursor page number   
-       unsigned char *mem;             // frame, cursor, page memory buffer
-       unsigned char key[256]; // last found complete key
+       uid cursor_page;                                // current cursor page number   
+       unsigned char *mem;                             // frame, cursor, page memory buffer
+       unsigned char key[BT_keyarray]; // last found complete key
        int found;                              // last delete or insert was found
        int err;                                // last error
 } BtDb;
@@ -312,7 +315,7 @@ extern BtDb *bt_open (BtMgr *mgr);
 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
-extern BtKey bt_foundkey (BtDb *bt);
+extern BtKey *bt_foundkey (BtDb *bt);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
 
@@ -323,8 +326,8 @@ void bt_mgrclose (BtMgr *mgr);
 //  Helper functions to return slot values
 //     from the cursor page.
 
-extern BtKey bt_key (BtDb *bt, uint slot);
-extern BtVal bt_val (BtDb *bt, uint slot);
+extern BtKey *bt_key (BtDb *bt, uint slot);
+extern BtVal *bt_val (BtDb *bt, uint slot);
 
 //  BTree page number constants
 #define ALLOC_page             0       // allocation & latch manager hash table
@@ -388,8 +391,8 @@ extern BtVal bt_val (BtDb *bt, uint slot);
 //     Page slots use 1 based indexing.
 
 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
-#define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
-#define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
+#define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
+#define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
 
 void bt_putid(unsigned char *dest, uid id)
 {
@@ -839,13 +842,13 @@ BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uin
 uint lvl, attr, cacheblk, last, slot, idx;
 uint nlatchpage, latchhash;
 unsigned char value[BtId];
+int flag, initit = 0;
 BtLatchMgr *latchmgr;
 off64_t size;
 uint amt[1];
 BtMgr* mgr;
-BtKey key;
-BtVal val;
-int flag;
+BtKey* key;
+BtVal *val;
 
 #ifndef unix
 SYSTEM_INFO sysinfo[1];
@@ -889,10 +892,15 @@ SYSTEM_INFO sysinfo[1];
        *amt = 0;
 
        // read minimum page size to get root info
+       //      to support raw disk partition files
+       //      check if bits == 0 on the disk.
 
        if( size = lseek (mgr->idx, 0L, 2) ) {
                if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
-                       bits = latchmgr->alloc->bits;
+                       if( latchmgr->alloc->bits )
+                               bits = latchmgr->alloc->bits;
+                       else
+                               initit = 1;
                else
                        return free(mgr), free(latchmgr), NULL;
        } else if( mode == BT_ro )
@@ -907,6 +915,8 @@ SYSTEM_INFO sysinfo[1];
                bits = latchmgr->alloc->bits;
        } else if( mode == BT_ro )
                return bt_mgrclose (mgr), NULL;
+       else
+               initit = 1;
 #endif
 
        mgr->page_size = 1 << bits;
@@ -944,7 +954,7 @@ SYSTEM_INFO sysinfo[1];
        mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
 #endif
 
-       if( size || *amt )
+       if( !initit )
                goto mgrlatch;
 
        // initialize an empty b-tree with latch page, root page, page of leaves
@@ -975,7 +985,7 @@ SYSTEM_INFO sysinfo[1];
        bt_putid (latchmgr->alloc->left, LEAF_page);
 
 #ifdef unix
-       if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
+       if( pwrite (mgr->idx, latchmgr, mgr->page_size, 0) < mgr->page_size )
                return bt_mgrclose (mgr), NULL;
 #else
        if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
@@ -989,7 +999,7 @@ SYSTEM_INFO sysinfo[1];
        latchmgr->alloc->bits = mgr->page_bits;
 
        for( lvl=MIN_lvl; lvl--; ) {
-               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
+               slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
                key = keyptr(latchmgr->alloc, 1);
                key->len = 2;           // create stopper key
                key->key[0] = 0xff;
@@ -1005,7 +1015,7 @@ SYSTEM_INFO sysinfo[1];
                latchmgr->alloc->cnt = 1;
                latchmgr->alloc->act = 1;
 #ifdef unix
-               if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
+               if( pwrite (mgr->idx, latchmgr, mgr->page_size, (MIN_lvl - lvl) << bits) < mgr->page_size )
                        return bt_mgrclose (mgr), NULL;
 #else
                if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
@@ -1093,7 +1103,7 @@ BtDb *bt = malloc (sizeof(*bt));
 //  compare two keys, returning > 0, = 0, or < 0
 //  as the comparison value
 
-int keycmp (BtKey key1, unsigned char *key2, uint len2)
+int keycmp (BtKey* key1, unsigned char *key2, uint len2)
 {
 uint len1 = key1->len;
 int ans;
@@ -1593,30 +1603,21 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
 {
-unsigned char leftkey[256], rightkey[256];
+unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
 unsigned char value[BtId];
-BtKey ptr;
+BtKey* ptr;
 uint idx;
 
-       // collapse empty slots beneath our slot
-
-       while( idx = set->page->cnt - 1 )
-         if( slotptr(set->page, idx)->dead ) {
-               *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
-               memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
-         } else
-               break;
-
        //      remove the old fence value
 
        ptr = keyptr(set->page, set->page->cnt);
-       memcpy (rightkey, ptr, ptr->len + 1);
+       memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
        memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
 
        //  cache new fence value
 
        ptr = keyptr(set->page, set->page->cnt);
-       memcpy (leftkey, ptr, ptr->len + 1);
+       memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
 
        bt_lockpage (BtLockParent, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
@@ -1624,13 +1625,16 @@ uint idx;
        //      insert new (now smaller) fence key
 
        bt_putid (value, set->page_no);
+       ptr = (BtKey*)leftkey;
 
-       if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
          return bt->err;
 
        //      now delete old fence key
 
-       if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
+       ptr = (BtKey*)rightkey;
+
+       if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
                return bt->err;
 
        bt_unlockpage (BtLockParent, set->latch);
@@ -1711,12 +1715,12 @@ BtPageSet right2[1];
 
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
 {
-unsigned char lowerfence[256], higherfence[256];
+unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
 BtPageSet set[1], right[1], right2[1];
+uint slot, idx, found, fence;
 unsigned char value[BtId];
-uint slot, idx, found;
-BtKey ptr, tst;
-BtVal val;
+BtKey *ptr, *tst;
+BtVal *val;
 
        if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
                ptr = keyptr(set->page, slot);
@@ -1728,19 +1732,30 @@ BtVal val;
        if( slotptr(set->page, slot)->type == Librarian )
                ptr = keyptr(set->page, ++slot);
 
+       fence = slot == set->page->cnt;
+
        // if key is found delete it, otherwise ignore request
 
        if( found = !keycmp (ptr, key, len) )
          if( found = slotptr(set->page, slot)->dead == 0 ) {
                val = valptr(set->page,slot);
                slotptr(set->page, slot)->dead = 1;
-               set->page->garbage += ptr->len + val->len + 2;
+               set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
                set->page->act--;
+
+               // collapse empty slots beneath the fence
+
+               while( idx = set->page->cnt - 1 )
+                 if( slotptr(set->page, idx)->dead ) {
+                       *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
+                       memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
+                 } else
+                       break;
          }
 
        //      did we delete a fence key in an upper level?
 
-       if( found && lvl && set->page->act && slot == set->page->cnt )
+       if( found && lvl && set->page->act && fence )
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
@@ -1767,7 +1782,7 @@ BtVal val;
        //      to post in parent
 
        ptr = keyptr(set->page, set->page->cnt);
-       memcpy (lowerfence, ptr, ptr->len + 1);
+       memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
 
        //      obtain lock on right page
 
@@ -1802,7 +1817,7 @@ BtVal val;
        // cache copy of key to update
 
        ptr = keyptr(right->page, right->page->cnt);
-       memcpy (higherfence, ptr, ptr->len + 1);
+       memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
 
        // mark right page deleted and point it to left page
        //      until we can post parent updates
@@ -1819,13 +1834,16 @@ BtVal val;
        // redirect higher key directly to our new node contents
 
        bt_putid (value, set->page_no);
+       ptr = (BtKey*)higherfence;
 
-       if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
          return bt->err;
 
        //      delete old lower key to our node
 
-       if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
+       ptr = (BtKey*)lowerfence;
+
+       if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
          return bt->err;
 
        //      obtain delete and write locks to right node
@@ -1842,9 +1860,9 @@ BtVal val;
        return 0;
 }
 
-BtKey bt_foundkey (BtDb *bt)
+BtKey *bt_foundkey (BtDb *bt)
 {
-       return (BtKey)(bt->key);
+       return (BtKey*)(bt->key);
 }
 
 //     advance to next slot
@@ -1897,8 +1915,8 @@ int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value,
 BtPageSet set[1];
 uint len, slot;
 int ret = -1;
-BtKey ptr;
-BtVal val;
+BtKey *ptr;
+BtVal *val;
 
   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
    do {
@@ -1911,7 +1929,7 @@ BtVal val;
 
        //      return actual key found
 
-       memcpy (bt->key, ptr, ptr->len + 1);
+       memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
        len = ptr->len;
 
        if( slotptr(set->page, slot)->type == Duplicate )
@@ -1953,10 +1971,10 @@ uint nxt = bt->mgr->page_size;
 uint cnt = 0, idx = 0;
 uint max = page->cnt;
 uint newslot = max;
-BtKey key;
-BtVal val;
+BtKey *key;
+BtVal *val;
 
-       if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
+       if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
                return slot;
 
        //      skip cleanup and proceed to split
@@ -1986,15 +2004,15 @@ BtVal val;
                // copy the value across
 
                val = valptr(bt->frame, cnt);
-               nxt -= val->len + 1;
+               nxt -= val->len + sizeof(BtVal);
                ((unsigned char *)page)[nxt] = val->len;
-               memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
+               memcpy ((unsigned char *)page + nxt + sizeof(BtVal), val->value, val->len);
 
                // copy the key across
 
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
+               nxt -= key->len + sizeof(BtKey);
+               memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
 
                // make a librarian slot
 
@@ -2018,7 +2036,7 @@ BtVal val;
 
        //      see if page has enough space now, or does it need splitting?
 
-       if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
+       if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
                return newslot;
 
        return 0;
@@ -2026,11 +2044,13 @@ BtVal val;
 
 // split the root and raise the height of the btree
 
-BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, BtPageSet *right)
+BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
 {
 uint nxt = bt->mgr->page_size;
 unsigned char value[BtId];
 BtPageSet left[1];
+BtKey *ptr;
+BtVal *val;
 
        //  Obtain an empty page to use, and copy the current
        //  root contents into it, e.g. lower keys
@@ -2051,30 +2071,34 @@ BtPageSet left[1];
 
        memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
 
+       // insert stopper key at top of newroot page
+       // and increase the root height
+
+       nxt -= BtId + sizeof(BtVal);
+       bt_putid (value, right->page_no);
+       val = (BtVal *)((unsigned char *)root->page + nxt);
+       memcpy (val->value, value, BtId);
+       val->len = BtId;
+
+       nxt -= 2 + sizeof(BtKey);
+       slotptr(root->page, 2)->off = nxt;
+       ptr = (BtKey *)((unsigned char *)root->page + nxt);
+       ptr->len = 2;
+       ptr->key[0] = 0xff;
+       ptr->key[1] = 0xff;
+
        // insert lower keys page fence key on newroot page as first key
 
-       nxt -= BtId + 1;
+       nxt -= BtId + sizeof(BtVal);
        bt_putid (value, left->page_no);
-       ((unsigned char *)root->page)[nxt] = BtId;
-       memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
+       val = (BtVal *)((unsigned char *)root->page + nxt);
+       memcpy (val->value, value, BtId);
+       val->len = BtId;
 
-       nxt -= *leftkey + 1;
-       memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
+       nxt -= leftkey->len + sizeof(BtKey);
        slotptr(root->page, 1)->off = nxt;
+       memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
        
-       // insert stopper key on newroot page
-       // and increase the root height
-
-       nxt -= 3 + BtId + 1;
-       ((unsigned char *)root->page)[nxt] = 2;
-       ((unsigned char *)root->page)[nxt+1] = 0xff;
-       ((unsigned char *)root->page)[nxt+2] = 0xff;
-
-       bt_putid (value, right->page_no);
-       ((unsigned char *)root->page)[nxt+3] = BtId;
-       memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
-       slotptr(root->page, 2)->off = nxt;
-
        bt_putid(root->page->right, 0);
        root->page->min = nxt;          // reset lowest used offset and key count
        root->page->cnt = 2;
@@ -2094,15 +2118,15 @@ BtPageSet left[1];
 
 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
 {
+unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
-unsigned char fencekey[256], rightkey[256];
 unsigned char value[BtId];
 uint lvl = set->page->lvl;
 BtPageSet right[1];
+BtKey *key, *ptr;
+BtVal *val, *src;
 uid right2;
 uint prev;
-BtKey key;
-BtVal val;
 
        //  split higher half of keys to bt->frame
 
@@ -2114,14 +2138,16 @@ BtVal val;
        while( cnt++ < max ) {
                if( slotptr(set->page, cnt)->dead && cnt < max )
                        continue;
-               val = valptr(set->page, cnt);
-               nxt -= val->len + 1;
-               ((unsigned char *)bt->frame)[nxt] = val->len;
-               memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
+               src = valptr(set->page, cnt);
+               nxt -= src->len + sizeof(BtVal);
+               val = (BtVal*)((unsigned char *)bt->frame + nxt);
+               memcpy (val->value, src->value, src->len);
+               val->len = src->len;
 
                key = keyptr(set->page, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
+               nxt -= key->len + sizeof(BtKey);
+               ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
+               memcpy (ptr, key, key->len + sizeof(BtKey));
 
                //      add librarian slot
 
@@ -2142,7 +2168,7 @@ BtVal val;
 
        // remember existing fence key for new page to the right
 
-       memcpy (rightkey, key, key->len + 1);
+       memcpy (rightkey, key, key->len + sizeof(BtKey));
 
        bt->frame->bits = bt->mgr->page_bits;
        bt->frame->min = nxt;
@@ -2190,13 +2216,13 @@ BtVal val;
                if( slotptr(bt->frame, cnt)->dead )
                        continue;
                val = valptr(bt->frame, cnt);
-               nxt -= val->len + 1;
+               nxt -= val->len + sizeof(BtVal);
                ((unsigned char *)set->page)[nxt] = val->len;
-               memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
+               memcpy ((unsigned char *)set->page + nxt + sizeof(BtVal), val->value, val->len);
 
                key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
+               nxt -= key->len + sizeof(BtKey);
+               memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
 
                //      add librarian slot
 
@@ -2215,7 +2241,7 @@ BtVal val;
 
        // remember fence key for smaller page
 
-       memcpy(fencekey, key, key->len + 1);
+       memcpy(fencekey, key, key->len + sizeof(BtKey));
 
        bt_putid(set->page->right, right->page_no);
        set->page->min = nxt;
@@ -2224,7 +2250,7 @@ BtVal val;
        // if current page is the root page, split it
 
        if( set->page_no == ROOT_page )
-               return bt_splitroot (bt, set, fencekey, right);
+               return bt_splitroot (bt, set, (BtKey *)fencekey, right);
 
        // insert new fences in their parent pages
 
@@ -2265,6 +2291,8 @@ BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uin
 {
 uint idx, librarian;
 BtSlot *node;
+BtKey *ptr;
+BtVal *val;
 
        //      if found slot > desired slot and previous slot
        //      is a librarian slot, use it
@@ -2275,16 +2303,18 @@ BtSlot *node;
 
        // copy value onto page
 
-       set->page->min -= vallen + 1; // reset lowest used offset
-       ((unsigned char *)set->page)[set->page->min] = vallen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
+       set->page->min -= vallen + sizeof(BtVal);
+       val = (BtVal*)((unsigned char *)set->page + set->page->min);
+       memcpy (val->value, value, vallen);
+       val->len = vallen;
 
        // copy key onto page
 
-       set->page->min -= keylen + 1; // reset lowest used offset
-       ((unsigned char *)set->page)[set->page->min] = keylen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
-
+       set->page->min -= keylen + sizeof(BtKey);
+       ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+       memcpy (ptr->key, key, keylen);
+       ptr->len = keylen;
+       
        //      find first empty slot
 
        for( idx = slot; idx < set->page->cnt; idx++ )
@@ -2330,18 +2360,19 @@ BtSlot *node;
 
 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
 {
-unsigned char newkey[256];
+unsigned char newkey[BT_keyarray];
 uint slot, idx, len;
 BtPageSet set[1];
+BtKey *ptr, *ins;
 uid sequence;
-BtKey ptr;
-BtVal val;
+BtVal *val;
 uint type;
 
   // set up the key we're working on
 
-  memcpy (newkey + 1, key, keylen);
-  newkey[0] = keylen;
+  ins = (BtKey*)newkey;
+  memcpy (ins->key, key, keylen);
+  ins->len = keylen;
 
   // is this a non-unique index value?
 
@@ -2350,12 +2381,12 @@ uint type;
   else {
        type = Duplicate;
        sequence = bt_newdup (bt);
-       bt_putid (newkey + *newkey + 1, sequence);
-       *newkey += BtId;
+       bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
+       ins->len += BtId;
   }
 
   while( 1 ) { // find the page and slot for the current key
-       if( slot = bt_loadpage (bt, set, newkey + 1, *newkey, lvl, BtLockWrite) )
+       if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
                ptr = keyptr(set->page, slot);
        else {
                if( !bt->err )
@@ -2378,19 +2409,21 @@ uint type;
        //      check for adequate space on the page
        //      and insert the new key before slot.
 
-       if( unique && (len != *newkey || memcmp (ptr->key, newkey+1, *newkey)) || !unique ) {
-         if( !(slot = bt_cleanpage (bt, set->page, *newkey, slot, vallen)) )
+       if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
+         if( !(slot = bt_cleanpage (bt, set->page, ins->len, slot, vallen)) )
                if( bt_splitpage (bt, set) )
                  return bt->err;
                else
                  continue;
 
-         return bt_insertslot (bt, set, slot, newkey + 1, *newkey, value, vallen, type);
+         return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
        }
 
        // if key already exists, update value and return
 
-       if( val = valptr(set->page, slot), val->len >= vallen ) {
+       val = valptr(set->page, slot);
+
+       if( val->len >= vallen ) {
                if( slotptr(set->page, slot)->dead )
                        set->page->act++;
                set->page->garbage += val->len - vallen;
@@ -2406,7 +2439,7 @@ uint type;
        //      new update value doesn't fit in existing value area
 
        if( !slotptr(set->page, slot)->dead )
-               set->page->garbage += val->len + ptr->len + 2;
+               set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
        else {
                slotptr(set->page, slot)->dead = 0;
                set->page->act++;
@@ -2418,13 +2451,15 @@ uint type;
          else
                continue;
 
-       set->page->min -= vallen + 1;
-       ((unsigned char *)set->page)[set->page->min] = vallen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
+       set->page->min -= vallen + sizeof(BtVal);
+       val = (BtVal*)((unsigned char *)set->page + set->page->min);
+       memcpy (val->value, value, vallen);
+       val->len = vallen;
 
-       set->page->min -= keylen + 1;
-       ((unsigned char *)set->page)[set->page->min] = keylen;
-       memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
+       set->page->min -= keylen + sizeof(BtKey);
+       ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+       memcpy (ptr->key, key, keylen);
+       ptr->len = keylen;
        
        slotptr(set->page, slot)->off = set->page->min;
        bt_unlockpage(BtLockWrite, set->latch);
@@ -2558,12 +2593,12 @@ uint slot;
        return slot;
 }
 
-BtKey bt_key(BtDb *bt, uint slot)
+BtKey *bt_key(BtDb *bt, uint slot)
 {
        return keyptr(bt->cursor, slot);
 }
 
-BtVal bt_val(BtDb *bt, uint slot)
+BtVal *bt_val(BtDb *bt, uint slot)
 {
        return valptr(bt->cursor,slot);
 }
@@ -2637,7 +2672,7 @@ ushort idx, hashidx;
 uid next, page_no;
 BtLatchSet *latch;
 uint cnt = 0;
-BtKey ptr;
+BtKey *ptr;
 
 #ifdef unix
        posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
@@ -2733,18 +2768,21 @@ void *index_file (void *arg)
 uint __stdcall index_file (void *arg)
 #endif
 {
-int line = 0, found = 0, cnt = 0;
+int line = 0, found = 0, cnt = 0, unique;
 uid next, page_no = LEAF_page; // start on first page of leaves
-unsigned char key[256];
+unsigned char key[BT_maxkey];
 ThreadArg *args = arg;
 int ch, len = 0, slot;
 BtPageSet set[1];
-BtKey ptr;
+BtKey *ptr;
+BtVal *val;
 BtDb *bt;
 FILE *in;
 
        bt = bt_open (args->mgr);
 
+       unique = (args->type[1] | 0x20) != 'd';
+
        switch(args->type[0] | 0x20)
        {
        case 'a':
@@ -2761,11 +2799,11 @@ FILE *in;
                        {
                          line++;
 
-                         if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
+                         if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
                fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
                break;
@@ -2784,11 +2822,11 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
-                         if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
+                         if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
                fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
                break;
@@ -2808,14 +2846,14 @@ FILE *in;
 
                          if( bt_findkey (bt, key, len, NULL, 0) < 0 )
                                fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
-                         ptr = (BtKey)(bt->key);
+                         ptr = (BtKey*)(bt->key);
                          found++;
 
                          if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
                fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
                break;
@@ -2839,7 +2877,7 @@ FILE *in;
                                fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
                          len = 0;
                        }
-                       else if( len < 255 )
+                       else if( len < BT_maxkey )
                                key[len++] = ch;
                fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
                break;
@@ -2865,6 +2903,8 @@ FILE *in;
                                        len -= BtId;
 
                                fwrite (ptr->key, len, 1, stdout);
+                               val = valptr(set->page, slot);
+                               fwrite (val->value, val->len, 1, stdout);
                                fputc ('\n', stdout);
                                cnt++;
                           }
@@ -2891,6 +2931,8 @@ FILE *in;
                                len -= BtId;
 
                        fwrite (ptr->key, len, 1, stdout);
+                       val = valptr(bt->cursor, slot);
+                       fwrite (val->value, val->len, 1, stdout);
                        fputc ('\n', stdout);
                        cnt++;
                  }
@@ -2959,7 +3001,7 @@ float elapsed;
 int num = 0;
 char key[1];
 BtMgr *mgr;
-BtKey ptr;
+BtKey *ptr;
 BtDb *bt;
 
        if( argc < 3 ) {