]> pd.if.org Git - btree/commitdiff
Threadskv10f.c -- add finding keys in addition to penny-sort insert to LSM btree...
authorunknown <karl@E04.petzent.com>
Tue, 28 Oct 2014 22:27:19 +0000 (15:27 -0700)
committerunknown <karl@E04.petzent.com>
Tue, 28 Oct 2014 22:27:19 +0000 (15:27 -0700)
threadskv10f.c [moved from threadskv10e.c with 92% similarity]

similarity index 92%
rename from threadskv10e.c
rename to threadskv10f.c
index fa5fdfbd7eea769f7c4a0193e857b2bbf1b32cff..fe6f3691f91ce8868081ab98aa6825aefc8bc5d2 100644 (file)
@@ -1,4 +1,4 @@
-// btree version threadskv10e futex version
+// btree version threadskv10f futex version
 //     with reworked bt_deletekey code,
 //     phase-fair re-entrant reader writer lock,
 //     librarian page split code,
@@ -8,9 +8,10 @@
 //     ACID batched key-value updates
 //     redo log for failure recovery
 //     LSM B-trees for write optimization
-//     and larger sized leaf pages than non-leaf
+//     larger sized leaf pages than non-leaf
+//     and LSM B-tree find operations
 
-// 24 OCT 2014
+// 28 OCT 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -127,10 +128,10 @@ typedef struct {
 typedef struct {
   MutexLatch xcl[1];
   MutexLatch wrt[1];
-  uint readers;
+  ushort readers;
   ushort dup;          // re-entrant locks
   ushort tid;          // owner thread-no
-  uint line;           // owner line #
+  ushort line;         // owner line #
 } RWLock;
 
 //  hash table entries
@@ -144,11 +145,11 @@ typedef struct {
 
 typedef struct {
        uid page_no;                    // latch set page number
-       MutexLatch modify[1];   // modify entry lite latch
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
        RWLock parent[1];               // Posting of fence key in parent
        RWLock link[1];                 // left link update in progress
+       MutexLatch modify[1];   // modify entry lite latch
        uint split;                             // right split page atomic insert
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
@@ -249,6 +250,7 @@ typedef struct {
        unsigned char freechain[BtId];  // head of free page_nos chain
        unsigned char leafchain[BtId];  // head of leaf page_nos chain
        unsigned long long leafpages;   // number of active leaf pages
+       unsigned long long upperpages;  // number of active upper pages
        uint redopages;                                 // number of redo pages in file
        unsigned char leaf_xtra;                // leaf page size in xtra bits
        unsigned char page_bits;                // base page size in bits
@@ -306,7 +308,8 @@ typedef struct {
 typedef struct {
        BtMgr *mgr;                                     // buffer manager for entire process
        BtMgr *main;                            // buffer manager for main btree
-       BtPage cursor;                          // cached page frame for start/next
+       BtPage cachecursor;                     // cached page frame for cache btree
+       BtPage maincursor;                      // cached page frame for main btree
        ushort thread_no;                       // thread number
        unsigned char key[BT_keyarray]; // last found complete key
 } BtDb;
@@ -317,6 +320,7 @@ typedef struct {
        logseqno reqlsn;        // redo log seq no required
        uint entry:31;          // latch table entry number
        uint reuse:1;           // reused previous page
+       uint slot;                      // slot on page
 } AtomicTxn;
 
 //     Catastrophic errors
@@ -373,8 +377,8 @@ extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char
 
 extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
 extern uint bt_nextkey   (BtDb *bt, uint slot);
-extern uint bt_prevkey (BtDb *db, uint slot);
-extern uint bt_lastkey (BtDb *db);
+extern uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no);
+extern uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no);
 
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
@@ -384,7 +388,7 @@ extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
 
 //     atomic transaction functions
 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
-BTERR bt_txnpromote (BtDb *bt);
+BTERR bt_promote (BtDb *bt);
 
 //  The page is allocated from low and hi ends.
 //  The key slots are allocated from the bottom,
@@ -432,7 +436,7 @@ BTERR bt_txnpromote (BtDb *bt);
 //     Access macros to address slot and key values from the page
 //     Page slots use 1 based indexing.
 
-#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
+#define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
 
@@ -494,9 +498,11 @@ int bt_mutextry(MutexLatch *latch)
 
 void bt_releasemutex(MutexLatch *latch)
 {
-       *latch->bits->xcl = 0;
+MutexLatch prev[1];
 
-       if( *latch->bits->waiters )
+       *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
+
+       if( *prev->bits->waiters )
                sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
 }
 
@@ -1291,11 +1297,15 @@ uint slot;
 void bt_close (BtDb *bt)
 {
 #ifdef unix
-       if( bt->cursor )
-               free (bt->cursor);
+       if( bt->cachecursor )
+               free (bt->cachecursor);
+       if( bt->maincursor )
+               free (bt->maincursor);
 #else
-       if( bt->cursor)
-               VirtualFree (bt->cursor, 0, MEM_RELEASE);
+       if( bt->cachecursor)
+               VirtualFree (bt->cachecursor, 0, MEM_RELEASE);
+       if( bt->maincursor)
+               VirtualFree (bt->maincursor, 0, MEM_RELEASE);
 #endif
        free (bt);
 }
@@ -1416,6 +1426,7 @@ BtVal *val;
        pagezero->leaf_xtra = leafxtra;
 
        bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
+       pagezero->upperpages = 1;
        pagezero->leafpages = 1;
 
        //  initialize left-most LEAF page in
@@ -1438,6 +1449,11 @@ BtVal *val;
                        node->off <<= mgr->leaf_xtra;
 
                node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
+               node->type = Librarian;
+               node++->dead = 1;
+
+               node->off = node[-1].off;
+               key = keyptr(pagezero->alloc, 2);
                key = keyptr(pagezero->alloc, 1);
                key->len = 2;           // create stopper key
                key->key[0] = 0xff;
@@ -1450,7 +1466,7 @@ BtVal *val;
 
                pagezero->alloc->min = node->off;
                pagezero->alloc->lvl = lvl;
-               pagezero->alloc->cnt = 1;
+               pagezero->alloc->cnt = 2;
                pagezero->alloc->act = 1;
                pagezero->alloc->page_no = MIN_lvl - lvl;
 
@@ -1520,9 +1536,11 @@ BtDb *bt = malloc (sizeof(*bt));
        bt->main = main;
        bt->mgr = mgr;
 #ifdef unix
-       bt->cursor = valloc (mgr->page_size << mgr->leaf_xtra);
+       bt->cachecursor = valloc (mgr->page_size << mgr->leaf_xtra);
+       bt->maincursor = valloc (mgr->page_size << mgr->leaf_xtra);
 #else
-       bt->cursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
+       bt->cachecursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
+       bt->maincursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
 #endif
 #ifdef unix
        bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
@@ -1563,11 +1581,7 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
                ReadLock (latch->readwr, thread_no);
                break;
        case BtLockWrite:
-//if(latch->leaf)
-//fprintf(stderr, "WrtRqst %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
                WriteLock (latch->readwr, thread_no, line);
-//if(latch->leaf)
-//fprintf(stderr, "WrtLock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
                break;
        case BtLockAccess:
                ReadLock (latch->access, thread_no);
@@ -1593,8 +1607,6 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
                ReadRelease (latch->readwr);
                break;
        case BtLockWrite:
-//if(latch->leaf)
-//fprintf(stderr, "Un Lock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); 
                WriteRelease (latch->readwr);
                break;
        case BtLockAccess:
@@ -1621,9 +1633,10 @@ uint page_size = mgr->page_size, page_xtra = 0;
 unsigned char *freechain;
 uid page_no;
 
-       if( contents->lvl )
+       if( contents->lvl ) {
                freechain = mgr->pagezero->freechain;
-       else {
+               mgr->pagezero->upperpages++;
+       } else {
                freechain = mgr->pagezero->leafchain;
                mgr->pagezero->leafpages++;
                page_xtra = mgr->leaf_xtra;
@@ -1646,10 +1659,10 @@ uid page_no;
                bt_putid(freechain, bt_getid(set->page->right));
 
                //  the page is currently nopromote and this
-               //  will keep bt_txnpromote out.
+               //  will keep bt_promote out.
 
                //      contents will replace this bit
-               //  and pin will keep bt_txnpromote out
+               //  and pin will keep bt_promote out
 
                contents->page_no = page_no;
                contents->nopromote = 0;
@@ -1674,7 +1687,7 @@ uid page_no;
 //       fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
        bt_releasemutex(mgr->lock);
 
-       //      keep bt_txnpromote out of this page
+       //      keep bt_promote out of this page
        contents->nopromote = 1;
        contents->page_no = page_no;
        if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
@@ -1684,7 +1697,7 @@ uid page_no;
        else
                return mgr->err_thread = thread_no, mgr->err;
 
-       // now pin will keep bt_txnpromote out
+       // now pin will keep bt_promote out
 
        set->page->nopromote = 0;
        set->latch->dirty = 1;
@@ -1858,9 +1871,10 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
 {
 unsigned char *freechain;
 
-       if( set->page->lvl )
+       if( set->page->lvl ) {
                freechain = mgr->pagezero->freechain;
-       else {
+               mgr->pagezero->upperpages--;
+       } else {
                freechain = mgr->pagezero->leafchain;
                mgr->pagezero->leafpages--;
        }
@@ -2016,7 +2030,7 @@ BtKey *ptr;
                return mgr->line = __LINE__, mgr->err = BTERR_struct;
 
        // pull contents of right peer into our empty page
-       //      preserving our left page number.
+       //      preserving our left page number, and its right page number.
 
        bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
        page_no = bt_getid(set->page->left);
@@ -2027,6 +2041,8 @@ BtKey *ptr;
        set->page->page_no = set->latch->page_no;
        set->latch->dirty = 1;
 
+       //  fix left link from far right page
+
        if( right2 = bt_getid (right->page->right) ) {
          if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
                temp->page = bt_mappage (mgr, temp->latch);
@@ -2037,7 +2053,7 @@ BtKey *ptr;
          bt_putid (temp->page->left, set->latch->page_no);
          bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
          bt_unpinlatch (temp->latch, thread_no, __LINE__);
-       } else {        // page is rightmost
+       } else {        // our page is now rightmost
          bt_mutexlock (mgr->lock);
          bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
          bt_releasemutex(mgr->lock);
@@ -2047,6 +2063,7 @@ BtKey *ptr;
 
        right->latch->dirty = 1;
        right->page->kill = 1;
+       bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
 
        // redirect higher key directly to our new node contents
 
@@ -2056,9 +2073,7 @@ BtKey *ptr;
        if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
          return mgr->err;
 
-       bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
-
-       //      delete orignal fence key in parent
+       //      delete our orignal fence key in parent
        //      and unlock our page.
 
        ptr = (BtKey *)lowerfence;
@@ -2158,97 +2173,6 @@ BtVal *val;
        return 0;
 }
 
-//     advance to next slot
-
-uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
-{
-BtLatchSet *prevlatch;
-uid page_no;
-
-       if( slot < set->page->cnt )
-               return slot + 1;
-
-       prevlatch = set->latch;
-
-       if( page_no = bt_getid(set->page->right) )
-         if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
-               set->page = bt_mappage (bt->mgr, set->latch);
-         else
-               return 0;
-       else
-         return bt->mgr->err = BTERR_struct, bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__, 0;
-
-       // obtain access lock using lock chaining with Access mode
-
-       bt_lockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
-
-       bt_unlockpage(BtLockRead, prevlatch, bt->thread_no, __LINE__);
-       bt_unpinlatch (prevlatch, bt->thread_no, __LINE__);
-
-       bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       bt_unlockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
-       return 1;
-}
-
-//     find unique key == given key, or first duplicate key in
-//     leaf level and return number of value bytes
-//     or (-1) if not found.
-
-int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
-{
-BtPageSet set[1];
-uint len, slot;
-int ret = -1;
-BtKey *ptr;
-BtVal *val;
-
-  if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
-   do {
-       ptr = keyptr(set->page, slot);
-
-       //      skip librarian slot place holder
-
-       if( slotptr(set->page, slot)->type == Librarian )
-               ptr = keyptr(set->page, ++slot);
-
-       //      return actual key found
-
-       memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
-       len = ptr->len;
-
-       if( slotptr(set->page, slot)->type == Duplicate )
-               len -= BtId;
-
-       //      not there if we reach the stopper key
-
-       if( slot == set->page->cnt )
-         if( !bt_getid (set->page->right) )
-               break;
-
-       // if key exists, return >= 0 value bytes copied
-       //      otherwise return (-1)
-
-       if( slotptr(set->page, slot)->dead )
-               continue;
-
-       if( keylen == len )
-         if( !memcmp (ptr->key, key, len) ) {
-               val = valptr (set->page,slot);
-               if( valmax > val->len )
-                       valmax = val->len;
-               memcpy (value, val->value, valmax);
-               ret = valmax;
-         }
-
-       break;
-
-   } while( slot = bt_findnext (bt, set, slot) );
-
-  bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
-  bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
-  return ret;
-}
-
 //     check page for space available,
 //     clean if necessary and return
 //     0 - page needs splitting
@@ -2345,9 +2269,9 @@ BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread
 unsigned char leftkey[BT_keyarray];
 uint nxt = mgr->page_size;
 unsigned char value[BtId];
+BtPage frame, page;
 BtPageSet left[1];
 uid left_page_no;
-BtPage frame;
 BtKey *ptr;
 BtVal *val;
 
@@ -2369,6 +2293,11 @@ BtVal *val;
        bt_unpinlatch (left->latch, thread_no, __LINE__);
        free (frame);
 
+       //      left link the pages together
+
+       page = bt_mappage (mgr, right);
+       bt_putid (page->left, left_page_no);
+
        // preserve the page info at the bottom
        // of higher keys and set rest to zero
 
@@ -2431,9 +2360,9 @@ uint page_size = mgr->page_size;
 BtPageSet right[1], temp[1];
 uint cnt = 0, idx = 0, max;
 uint lvl = set->page->lvl;
-BtKey *key, *ptr;
-BtVal *val, *src;
 BtPage frame;
+BtKey *key;
+BtVal *val;
 uid right2;
 uint entry;
 uint prev;
@@ -2455,14 +2384,13 @@ uint prev;
                  if( slotptr(set->page, cnt)->dead )
                        continue;
 
-               src = valptr(set->page, cnt);
-               frame->min -= src->len + sizeof(BtVal);
-               memcpy ((unsigned char *)frame + frame->min, src, src->len + sizeof(BtVal));
+               val = valptr(set->page, cnt);
+               frame->min -= val->len + sizeof(BtVal);
+               memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
 
                key = keyptr(set->page, cnt);
                frame->min -= key->len + sizeof(BtKey);
-               ptr = (BtKey*)((unsigned char *)frame + frame->min);
-               memcpy (ptr, key, key->len + sizeof(BtKey));
+               memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
 
                //      add librarian slot
 
@@ -2487,7 +2415,9 @@ uint prev;
        if( set->latch->page_no > ROOT_page ) {
                right2 = bt_getid (set->page->right);
                bt_putid (frame->right, right2);
-               bt_putid (frame->left, set->latch->page_no);
+
+               if( linkleft )
+                       bt_putid (frame->left, set->latch->page_no);
        }
 
        //      get new free page and write higher keys to it.
@@ -2570,8 +2500,10 @@ BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread
 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
 unsigned char value[BtId];
 uint lvl = set->page->lvl;
+BtPageSet temp[1];
 BtPage page;
 BtKey *ptr;
+uid right2;
 
        // if current page is the root page, split it
 
@@ -2586,6 +2518,23 @@ BtKey *ptr;
        ptr = keyptr(page, page->cnt);
        memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
 
+       //      splice in far right page's left page_no
+
+       if( right2 = bt_getid (page->right) ) {
+         if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
+               temp->page = bt_mappage (mgr, temp->latch);
+         else
+               return 0;
+
+      bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+         bt_putid (temp->page->left, right->page_no);
+         bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+         bt_unpinlatch (temp->latch, thread_no, __LINE__);
+       } else {  // right page is far right page
+         bt_mutexlock (mgr->lock);
+         bt_putid (mgr->pagezero->alloc->left, right->page_no);
+         bt_releasemutex(mgr->lock);
+       }
        // insert new fences in their parent pages
 
        bt_lockpage (BtLockParent, right, thread_no, __LINE__);
@@ -2649,14 +2598,16 @@ int rate;
        memcpy (ptr->key, key, keylen);
        ptr->len = keylen;
        
-       //      find first empty slot
+       //      find first empty slot at or above our insert slot
 
        for( idx = slot; idx < set->page->cnt; idx++ )
          if( slotptr(set->page, idx)->dead )
                break;
 
-       // now insert key into array before slot,
-       //      adding as many librarian slots as
+       // now insert key into array before slot.
+
+       //      if we're going all the way to the top,
+       //      add as many librarian slots as
        //      makes sense.
 
        if( idx == set->page->cnt ) {
@@ -2680,21 +2631,23 @@ int rate;
        } else
                librarian = 0, rate = 0;
 
+       //  transfer slots and add librarian slots
+
        while( idx > slot ) {
-               //  transfer slot
                *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
-               idx--;
 
                //      add librarian slot per rate
 
                if( librarian )
-                if( (idx - slot + 1)/2 <= librarian * rate ) {
-                       node = slotptr(set->page, idx--);
+                if( (idx - slot)/2 <= librarian * rate ) {
+                       node = slotptr(set->page, --idx);
                        node->off = node[1].off;
                        node->type = Librarian;
                        node->dead = 1;
                        librarian--;
                  }
+
+               --idx;
        }
 
        set->latch->dirty = 1;
@@ -2724,19 +2677,15 @@ BtVal *val;
        if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
                node = slotptr(set->page, slot);
                ptr = keyptr(set->page, slot);
-       } else {
-               if( !mgr->err )
-                       mgr->line = __LINE__, mgr->err = BTERR_ovflw;
-               return mgr->err_thread = thread_no, mgr->err;
-       }
+       } else
+               return mgr->err;
 
        // if librarian slot == found slot, advance to real slot
 
        if( node->type == Librarian ) {
-//       if( !keycmp (ptr, key, keylen) ) {
-               ptr = keyptr(set->page, ++slot);
-               node = slotptr(set->page, slot);
-         }
+               node = slotptr(set->page, ++slot);
+               ptr = keyptr(set->page, slot);
+       }
 
        //  if inserting a duplicate key or unique
        //      key that doesn't exist on the page,
@@ -2756,7 +2705,7 @@ BtVal *val;
                  goto insxit;
 
          if( entry = bt_splitpage (mgr, set, thread_no, 1) )
-           if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+           if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
                  continue;
 
          return mgr->err_thread = thread_no, mgr->err;
@@ -2800,7 +2749,7 @@ BtVal *val;
          break;
 
        if( entry = bt_splitpage (mgr, set, thread_no, 1) )
-         if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+         if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
                continue;
 
        return mgr->err_thread = thread_no, mgr->err;
@@ -2833,14 +2782,20 @@ insxit:
 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
 {
 BtKey *key = keyptr(source,src), *ptr;
-unsigned char fence[BT_keyarray];
-uint entry, slot;
+uint slot = locks[src].slot;
+uint entry;
 
        if( locks[src].reuse )
          entry = locks[src-1].entry;
        else
          entry = locks[src].entry;
 
+       if( slot ) {
+               set->latch = mgr->leafsets + entry;
+               set->page = bt_mappage (mgr, set->latch);
+               return slot;
+       }
+
        //      find where our key is located 
        //      on current page or pages split on
        //      same page txn operations.
@@ -2858,9 +2813,6 @@ uint entry, slot;
                }
        } while( entry = set->latch->split );
 
-       ptr = keyptr (set->page, set->page->cnt);
-       memcpy (fence, ptr, ptr->len + 1);
-
        mgr->line = __LINE__, mgr->err = BTERR_atomic;
        return 0;
 }
@@ -2887,7 +2839,7 @@ uint entry, slot;
        if( entry = bt_splitpage (mgr, set, thread_no, 0) )
          latch = mgr->leafsets + entry;
        else
-         return mgr->err_thread = thread_no, mgr->err;
+         return mgr->err;
 
        //      splice right page into split chain
        //      and WriteLock it
@@ -2895,6 +2847,10 @@ uint entry, slot;
        bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
        latch->split = set->latch->split;
        set->latch->split = entry;
+
+       // clear slot number for atomic page
+
+       locks[src].slot = 0;
   }
 
   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
@@ -3009,7 +2965,7 @@ int type;
 
   if( bt->main )
    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
-       if( bt_txnpromote (bt) )
+       if( bt_promote (bt) )
          return bt->mgr->err;
 
   // return success
@@ -3021,7 +2977,7 @@ int type;
 
 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
 {
-uint src, idx, samepage, entry;
+uint slot, src, idx, samepage, entry;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtLatchSet *latch;
@@ -3047,14 +3003,21 @@ BtVal *val;
          samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
 
        if( !samepage )
-         if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
+         if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
                ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
          else
                return mgr->err;
+       else
+         slot = 0;
+
+       if( slot )
+        if( slotptr(set->page, slot)->type == Librarian )
+         slot++;
 
        entry = set->latch - mgr->leafsets;
        locks[src].reuse = samepage;
        locks[src].entry = entry;
+       locks[src].slot = slot;
 
        //      capture current lsn for master page
 
@@ -3227,7 +3190,7 @@ BtVal *val;
 
 //  pick & promote a page into the larger btree
 
-BTERR bt_txnpromote (BtDb *bt)
+BTERR bt_promote (BtDb *bt)
 {
 uint entry, slot, idx;
 BtPageSet set[1];
@@ -3305,58 +3268,126 @@ fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_n
   }
 }
 
+//     find unique key == given key, or first duplicate key in
+//     leaf level and return number of value bytes
+//     or (-1) if not found.
+
+int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
+{
+int ret = -1, type;
+BtPageSet set[1];
+BtSlot *node;
+BtKey *ptr;
+BtVal *val;
+uint slot;
+
+ for( type = 0; type < 2; type++ )
+  if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
+       node = slotptr(set->page, slot);
+
+       //      skip librarian slot place holder
+
+       if( node->type == Librarian )
+         node = slotptr(set->page, ++slot);
+
+       ptr = keyptr(set->page, slot);
+
+       //      not there if we reach the stopper key
+       //  or the key doesn't match what's on the page.
+
+       if( slot == set->page->cnt )
+         if( !bt_getid (set->page->right) ) {
+               bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+               continue;
+       }
+
+       if( keycmp (ptr, key, keylen) ) {
+               bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+               continue;
+       }
+
+       // key matches, return >= 0 value bytes copied
+       //      or -1 if not there.
+
+       if( node->type == Delete || node->dead ) {
+               ret = -1;
+               goto findxit;
+       }
+
+       val = valptr (set->page,slot);
+
+       if( valmax > val->len )
+               valmax = val->len;
+
+       memcpy (value, val->value, valmax);
+       ret = valmax;
+       goto findxit;
+  }
+
+  ret = -1;
+
+findxit:
+  if( type < 2 ) {
+       bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
+       bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+  }
+  return ret;
+}
+
 //     set cursor to highest slot on highest page
 
-uint bt_lastkey (BtDb *bt)
+uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no)
 {
-uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+uid page_no = bt_getid (mgr->pagezero->alloc->left);
 BtPageSet set[1];
 
-       if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
-               set->page = bt_mappage (bt->mgr, set->latch);
+       if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
+               set->page = bt_mappage (mgr, set->latch);
        else
                return 0;
 
-    bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
-    bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
-       return bt->cursor->cnt;
+    bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+       memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
+    bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, thread_no, __LINE__);
+       return cursor->cnt;
 }
 
 //     return previous slot on cursor page
 
-uint bt_prevkey (BtDb *bt, uint slot)
+uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no)
 {
-uid cursor_page = bt->cursor->page_no;
+uid cursor_page = cursor->page_no;
 uid ourright, next, us = cursor_page;
 BtPageSet set[1];
 
        if( --slot )
                return slot;
 
-       ourright = bt_getid(bt->cursor->right);
+       ourright = bt_getid(cursor->right);
 
 goleft:
-       if( !(next = bt_getid(bt->cursor->left)) )
+       if( !(next = bt_getid(cursor->left)) )
                return 0;
 
 findourself:
        cursor_page = next;
 
-       if( set->latch = bt_pinleaf (bt->mgr, next, bt->thread_no) )
-               set->page = bt_mappage (bt->mgr, set->latch);
+       if( set->latch = bt_pinleaf (mgr, next, thread_no) )
+               set->page = bt_mappage (mgr, set->latch);
        else
                return 0;
 
-    bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
-       bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+    bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+       memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
+       bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, thread_no, __LINE__);
        
-       next = bt_getid (bt->cursor->right);
+       next = bt_getid (cursor->right);
 
-       if( bt->cursor->kill )
+       if( cursor->kill )
                goto findourself;
 
        if( next != us )
@@ -3365,7 +3396,7 @@ findourself:
          else
                goto findourself;
 
-       return bt->cursor->cnt;
+       return cursor->cnt;
 }
 
 //  return next slot on cursor page
@@ -3377,12 +3408,12 @@ BtPageSet set[1];
 uid right;
 
   do {
-       right = bt_getid(bt->cursor->right);
+       right = bt_getid(bt->cachecursor->right);
 
-       while( slot++ < bt->cursor->cnt )
-         if( slotptr(bt->cursor,slot)->dead )
+       while( slot++ < bt->cachecursor->cnt )
+         if( slotptr(bt->cachecursor,slot)->dead )
                continue;
-         else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
+         else if( right || (slot < bt->cachecursor->cnt) ) // skip infinite stopper
                return slot;
          else
                break;
@@ -3396,7 +3427,7 @@ uid right;
                return 0;
 
     bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
+       memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
        bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
        bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
        slot = 0;
@@ -3416,7 +3447,7 @@ uint slot;
        // cache page for retrieval
 
        if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
-         memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
+         memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
        else
          return 0;
 
@@ -3661,55 +3692,43 @@ FILE *in;
                break;
 
        case 's':
-               fprintf(stderr, "started scanning\n");
-               
-               do {
-                       if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
-                               set->page = bt_mappage (bt->mgr, set->latch);
-                       else
-                               fprintf(stderr, "unable to obtain latch"), exit(1);
-
-                       bt_lockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
-                       next = bt_getid (set->page->right);
-
-                       for( slot = 0; slot++ < set->page->cnt; )
-                        if( next || slot < set->page->cnt )
-                         if( !slotptr(set->page, slot)->dead ) {
-                               ptr = keyptr(set->page, slot);
-                               len = ptr->len;
-
-                           if( slotptr(set->page, slot)->type == Duplicate )
-                                       len -= BtId;
-
-                               fwrite (ptr->key, len, 1, stdout);
-                               val = valptr(set->page, slot);
-                               fwrite (val->value, val->len, 1, stdout);
-                               fputc ('\n', stdout);
-                               cnt++;
-                          }
-
-                       bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
-                       bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
-               } while( page_no = next );
+               fprintf(stderr, "started forward scan\n");
+               if( slot = bt_startkey (bt, NULL, 0) )
+                 do {
+                       if( slotptr(bt->cachecursor, slot)->type == Librarian )
+                         slot++;
+
+                       ptr = keyptr(bt->cachecursor, slot);
+                       len = ptr->len;
+
+                       if( slotptr(bt->cachecursor, slot)->type == Duplicate )
+                               len -= BtId;
+
+                       fwrite (ptr->key, len, 1, stdout);
+                       val = valptr(bt->cachecursor, slot);
+                       fwrite (val->value, val->len, 1, stdout);
+                       fputc ('\n', stdout);
+                       cnt++;
+                } while( slot = bt_nextkey (bt, slot) );
 
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
 
        case 'r':
                fprintf(stderr, "started reverse scan\n");
-               if( slot = bt_lastkey (bt) )
-                  while( slot = bt_prevkey (bt, slot) ) {
-                       if( slotptr(bt->cursor, slot)->dead )
+               if( slot = bt_lastkey (bt->mgr, bt->cachecursor, bt->thread_no) )
+                  while( slot = bt_prevkey (bt->mgr, bt->cachecursor, slot, bt->thread_no) ) {
+                       if( slotptr(bt->cachecursor, slot)->dead )
                          continue;
 
-                       ptr = keyptr(bt->cursor, slot);
+                       ptr = keyptr(bt->cachecursor, slot);
                        len = ptr->len;
 
-                       if( slotptr(bt->cursor, slot)->type == Duplicate )
+                       if( slotptr(bt->cachecursor, slot)->type == Duplicate )
                                len -= BtId;
 
                        fwrite (ptr->key, len, 1, stdout);
-                       val = valptr(bt->cursor, slot);
+                       val = valptr(bt->cachecursor, slot);
                        fwrite (val->value, val->len, 1, stdout);
                        fputc ('\n', stdout);
                        cnt++;
@@ -3717,28 +3736,6 @@ FILE *in;
 
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
-
-       case 'c':
-#ifdef unix
-               posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
-#endif
-               fprintf(stderr, "started counting\n");
-               next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
-
-               while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
-                       if( bt_readpage (bt->mgr, bt->cursor, page_no, 1) )
-                               break;
-
-                       if( !bt->cursor->free && !bt->cursor->lvl )
-                               cnt += bt->cursor->act;
-
-                       bt->mgr->reads++;
-                       page_no = next++;
-               }
-               
-               cnt--;  // remove stopper key
-               fprintf(stderr, " Total keys counted %d\n", cnt);
-               break;
        }
 
        bt_close (bt);
@@ -3781,7 +3778,7 @@ BtKey *ptr;
                fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
                fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
                fprintf (stderr, "  where main_file is the name of the main btree file\n");
-               fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
+               fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
                fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
                fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
                fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
@@ -3901,9 +3898,9 @@ BtKey *ptr;
        if( main )
                bt_poolaudit(main, "main");
 
-       fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+       fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
        if( main )
-               fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found);
+               fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
 
        if( main )
                bt_mgrclose (main);
@@ -3917,4 +3914,13 @@ BtKey *ptr;
        fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
 }
 
+BtKey *bt_key (BtPage page, uint slot)
+{
+return keyptr(page,slot);
+}
+
+BtSlot *bt_slot (BtPage page, uint slot)
+{
+return slotptr(page,slot);
+}
 #endif //STANDALONE