]> pd.if.org Git - btree/commitdiff
Add finalized threadskv10g LSM btree.
authorunknown <karl@E04.petzent.com>
Wed, 29 Oct 2014 18:55:33 +0000 (11:55 -0700)
committerunknown <karl@E04.petzent.com>
Wed, 29 Oct 2014 18:55:33 +0000 (11:55 -0700)
README.md
threadskv10g.c [moved from threadskv10f.c with 90% similarity]

index 9ea76b242a368c66d5aa7aa5f5a062e794572b95..e4c2ce792dc5e9f020ee635edc8df570036e8cdc 100644 (file)
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
 Btree-source-code
 =================
 
-A working project for High-concurrency B-tree source code in C.  You probably want to download threadskv8.c for the latest developement version.
+A working project for High-concurrency B-tree source code in C.  You probably want to download threadskv10g.c for the latest developement version.
 
 Here are files in the btree source code:
 
@@ -35,6 +35,8 @@ threadskv7.c  Multi-Threaded/Single-Process with atomic add of a set of keys unde
 
 threadskv8.c   Multi-Threaded/Single-Process with atomic-consistent add of a set of keys based on threadskv6.c.  Uses btree page latches as locking granularity.
 
+threadskv10g.c Multi-Threaded/Single-Process with 2 Log-Structured-Merge (LSM) btrees based on threadskv8.c. Also adds dual leaf/interior node page sizes.
+
 Compilation is achieved on linux or Windows by:
 
 gcc -D STANDALONE -O3 threadskv8.c -lpthread
similarity index 90%
rename from threadskv10f.c
rename to threadskv10g.c
index fe6f3691f91ce8868081ab98aa6825aefc8bc5d2..4e11b5b7f6ec4715e8d7db27eeb337db112a1df0 100644 (file)
@@ -1,4 +1,4 @@
-// btree version threadskv10f futex version
+// btree version threadskv10g futex version
 //     with reworked bt_deletekey code,
 //     phase-fair re-entrant reader writer lock,
 //     librarian page split code,
@@ -9,7 +9,7 @@
 //     redo log for failure recovery
 //     LSM B-trees for write optimization
 //     larger sized leaf pages than non-leaf
-//     and LSM B-tree find operations
+//     and LSM B-tree find & count operations
 
 // 28 OCT 2014
 
@@ -308,13 +308,21 @@ typedef struct {
 typedef struct {
        BtMgr *mgr;                                     // buffer manager for entire process
        BtMgr *main;                            // buffer manager for main btree
-       BtPage cachecursor;                     // cached page frame for cache btree
-       BtPage maincursor;                      // cached page frame for main btree
+       BtPageSet cacheset[1];  // cached page frame for cache btree
+       BtPageSet mainset[1];   // cached page frame for main btree
+       uint cacheslot;                         // slot number in cacheset
+       uint mainslot;                          // slot number in mainset
+       ushort phase;                           // 1 = main btree 0 = cache btree 2 = both
        ushort thread_no;                       // thread number
-       unsigned char key[BT_keyarray]; // last found complete key
+       BtSlot *cachenode;
+       BtSlot *mainnode;
+       BtKey *cachekey;
+       BtKey *mainkey;
+       BtVal *cacheval;
+       BtVal *mainval;
 } BtDb;
 
-//     atomic txn structures
+//     atomic txn structure
 
 typedef struct {
        logseqno reqlsn;        // redo log seq no required
@@ -375,10 +383,11 @@ extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl,
 
 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
 
-extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
-extern uint bt_nextkey   (BtDb *bt, uint slot);
-extern uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no);
-extern uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no);
+extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
+extern BTERR bt_nextkey (BtDb *bt);
+
+extern uint bt_lastkey (BtDb *bt);
+extern uint bt_prevkey (BtDb *bt);
 
 //     manager functions
 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
@@ -1296,17 +1305,6 @@ uint slot;
 
 void bt_close (BtDb *bt)
 {
-#ifdef unix
-       if( bt->cachecursor )
-               free (bt->cachecursor);
-       if( bt->maincursor )
-               free (bt->maincursor);
-#else
-       if( bt->cachecursor)
-               VirtualFree (bt->cachecursor, 0, MEM_RELEASE);
-       if( bt->maincursor)
-               VirtualFree (bt->maincursor, 0, MEM_RELEASE);
-#endif
        free (bt);
 }
 
@@ -1535,13 +1533,6 @@ BtDb *bt = malloc (sizeof(*bt));
        memset (bt, 0, sizeof(*bt));
        bt->main = main;
        bt->mgr = mgr;
-#ifdef unix
-       bt->cachecursor = valloc (mgr->page_size << mgr->leaf_xtra);
-       bt->maincursor = valloc (mgr->page_size << mgr->leaf_xtra);
-#else
-       bt->cachecursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
-       bt->maincursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
-#endif
 #ifdef unix
        bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
 #else
@@ -2053,7 +2044,7 @@ BtKey *ptr;
          bt_putid (temp->page->left, set->latch->page_no);
          bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
          bt_unpinlatch (temp->latch, thread_no, __LINE__);
-       } else {        // our page is now rightmost
+       } else if( !lvl ) {     // our page is now rightmost leaf
          bt_mutexlock (mgr->lock);
          bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
          bt_releasemutex(mgr->lock);
@@ -2125,10 +2116,6 @@ BtVal *val;
                val = valptr(set->page,slot);
                set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
                set->page->act--;
-
-               //  mark node type as delete
-
-               node->type = Delete;
                node->dead = 1;
 
                // collapse empty slots beneath the fence
@@ -2438,7 +2425,7 @@ uint prev;
          bt_putid (temp->page->left, right->latch->page_no);
          bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
          bt_unpinlatch (temp->latch, thread_no, __LINE__);
-        } else {       // page is rightmost
+        } else if( !lvl ) {    // page is rightmost leaf
          bt_mutexlock (mgr->lock);
          bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
          bt_releasemutex(mgr->lock);
@@ -2530,7 +2517,7 @@ uid right2;
          bt_putid (temp->page->left, right->page_no);
          bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
          bt_unpinlatch (temp->latch, thread_no, __LINE__);
-       } else {  // right page is far right page
+       } else if( !lvl ) {  // right page is far right page
          bt_mutexlock (mgr->lock);
          bt_putid (mgr->pagezero->alloc->left, right->page_no);
          bt_releasemutex(mgr->lock);
@@ -3336,124 +3323,272 @@ findxit:
   return ret;
 }
 
-//     set cursor to highest slot on highest page
+//     set cursor to highest slot on left-most page
 
-uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no)
+BTERR bt_lastkey (BtDb *bt)
 {
-uid page_no = bt_getid (mgr->pagezero->alloc->left);
-BtPageSet set[1];
+uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
 
-       if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
-               set->page = bt_mappage (mgr, set->latch);
+       if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
+               bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
        else
-               return 0;
+               return bt->mgr->err;
 
-    bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
-       memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
-    bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
-       return cursor->cnt;
+    bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
+       bt->cacheslot = bt->cacheset->page->cnt;
+
+       if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
+               bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
+       else
+               return bt->main->err;
+
+    bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
+       bt->mainslot = bt->mainset->page->cnt;
+       bt->phase = 2;
+       return 0;
 }
 
 //     return previous slot on cursor page
 
-uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no)
+uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
 {
-uid cursor_page = cursor->page_no;
-uid ourright, next, us = cursor_page;
-BtPageSet set[1];
+uid next, us = set->latch->page_no;
 
-       if( --slot )
+  while( 1 ) {
+       while( --slot )
+         if( slotptr(set->page, slot)->dead )
+               continue;
+         else
                return slot;
 
-       ourright = bt_getid(cursor->right);
+       next = bt_getid(set->page->left);
 
-goleft:
-       if( !(next = bt_getid(cursor->left)) )
+       if( !next )
                return 0;
 
-findourself:
-       cursor_page = next;
+       do {
+         bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+         bt_unpinlatch (set->latch, thread_no, __LINE__);
 
-       if( set->latch = bt_pinleaf (mgr, next, thread_no) )
-               set->page = bt_mappage (mgr, set->latch);
+         if( set->latch = bt_pinleaf (mgr, next, thread_no) )
+           set->page = bt_mappage (mgr, set->latch);
+         else
+           return 0;
+
+      bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+         next = bt_getid (set->page->right);
+
+       } while( next != us );
+
+    slot = set->page->cnt + 1;
+  }
+}
+
+//  advance to previous key
+
+BTERR bt_prevkey (BtDb *bt)
+{
+int cmp;
+
+  // first advance last key(s) one previous slot
+
+  while( 1 ) {
+       switch( bt->phase ) {
+       case 0:
+         bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+         break;
+       case 1:
+         bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+         break;
+       case 2:
+         bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+         bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+       break;
+       }
+
+  // return next key
+
+       if( bt->cacheslot ) {
+         bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
+         bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
+         bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
+       }
+
+       if( bt->mainslot ) {
+         bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
+         bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
+         bt->mainval = valptr(bt->mainset->page, bt->mainslot);
+       }
+
+       if( bt->mainslot && bt->cacheslot )
+         cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
+       else if( bt->cacheslot )
+         cmp = 1;
+       else if( bt->mainslot )
+         cmp = -1;
        else
-               return 0;
+         return 0;
 
-    bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
-       memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
-       bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
-       bt_unpinlatch (set->latch, thread_no, __LINE__);
-       
-       next = bt_getid (cursor->right);
+       //  cache key is larger
 
-       if( cursor->kill )
-               goto findourself;
+       if( cmp > 0 ) {
+         bt->phase = 0;
+         if( bt->cachenode->type == Delete )
+               continue;
+         return bt->cacheslot;
+       }
 
-       if( next != us )
-         if( next == ourright )
-               goto goleft;
-         else
-               goto findourself;
+       //  main key is larger
 
-       return cursor->cnt;
+       if( cmp < 0 ) {
+         bt->phase = 1;
+         return bt->mainslot;
+       }
+
+       //      keys are equal
+
+       bt->phase = 2;
+
+       if( bt->cachenode->type == Delete )
+               continue;
+
+       return bt->cacheslot;
+  }
 }
 
-//  return next slot on cursor page
-//  or slide cursor right into next page
+//     advance to next slot in cache or main btree
+//     return 0 for EOF/error
 
-uint bt_nextkey (BtDb *bt, uint slot)
+uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
 {
-BtPageSet set[1];
-uid right;
-
-  do {
-       right = bt_getid(bt->cachecursor->right);
+BtPage page;
+uid page_no;
 
-       while( slot++ < bt->cachecursor->cnt )
-         if( slotptr(bt->cachecursor,slot)->dead )
+  while( 1 ) {
+       while( slot++ < set->page->cnt )
+         if( slotptr(set->page, slot)->dead )
                continue;
-         else if( right || (slot < bt->cachecursor->cnt) ) // skip infinite stopper
+         else if( slot < set->page->cnt || bt_getid (set->page->right) )
                return slot;
          else
-               break;
+               return 0;
 
-       if( !right )
-               break;
+       bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+       bt_unpinlatch (set->latch, thread_no, __LINE__);
 
-       if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
-               set->page = bt_mappage (bt->mgr, set->latch);
-       else
+       if( page_no = bt_getid(set->page->right) )
+         if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
+               set->page = bt_mappage (mgr, set->latch);
+         else
                return 0;
+       else
+         return 0; // EOF
 
-    bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
-       bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+       // obtain access lock using lock chaining with Access mode
+
+       bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
+       bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+       bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
        slot = 0;
+  }
+}
+
+//  advance to next key
+
+BTERR bt_nextkey (BtDb *bt)
+{
+int cmp;
+
+  // first advance last key(s) one next slot
+
+  while( 1 ) {
+       switch( bt->phase ) {
+       case 0:
+         bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+         break;
+       case 1:
+         bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+         break;
+       case 2:
+         bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+         bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+       break;
+       }
+
+  // return next key
+
+       if( bt->cacheslot ) {
+         bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
+         bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
+         bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
+       }
+
+       if( bt->mainslot ) {
+         bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
+         bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
+         bt->mainval = valptr(bt->mainset->page, bt->mainslot);
+       }
+
+       if( bt->mainslot && bt->cacheslot )
+         cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
+       else if( bt->mainslot )
+         cmp = 1;
+       else if( bt->cacheslot )
+         cmp = -1;
+       else
+         return 0;
 
-  } while( 1 );
+       //  main key is larger
 
-  return bt->mgr->err = 0;
+       if( cmp < 0 ) {
+         bt->phase = 0;
+         if( bt->cachenode->type == Delete )
+               continue;
+         return bt->cacheslot;
+       }
+
+       //  cache key is larger
+
+       if( cmp > 0 ) {
+         bt->phase = 1;
+         return bt->mainslot;
+       }
+
+       //      keys are equal
+
+       bt->phase = 2;
+
+       if( bt->cachenode->type == Delete )
+               continue;
+
+       return bt->cacheslot;
+  }
 }
 
-//  cache page of keys into cursor and return starting slot for given key
+//  start sweep of keys
 
-uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
+BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
 {
 BtPageSet set[1];
 uint slot;
 
-       // cache page for retrieval
+       // cache btree page
 
-       if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
-         memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
+       if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
+               bt->cacheslot = slot - 1;
        else
-         return 0;
+         return bt->mgr->err;
 
-       bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
-       bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
-       return slot;
+       // main btree page
+
+       if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
+               bt->mainslot = slot - 1;
+       else
+         return bt->mgr->err;
+
+       bt->phase = 2;
+       return 0;
 }
 
 #ifdef STANDALONE
@@ -3571,22 +3706,23 @@ void *index_file (void *arg)
 uint __stdcall index_file (void *arg)
 #endif
 {
-int line = 0, found = 0, cnt = 0, idx;
+int line = 0, found = 0, cnt = 0, cachecnt, idx;
 uid next, page_no = LEAF_page; // start on first page of leaves
 int ch, len = 0, slot, type = 0;
 unsigned char key[BT_maxkey];
-unsigned char txn[65536];
+unsigned char buff[65536];
+uint nxt = sizeof(buff);
 ThreadArg *args = arg;
 BtPage page, frame;
 BtPageSet set[1];
-uint nxt = 65536;
+int vallen;
 BtKey *ptr;
 BtVal *val;
 BtDb *bt;
 FILE *in;
 
        bt = bt_open (args->mgr, args->main);
-       page = (BtPage)txn;
+       page = (BtPage)buff;
 
        if( args->idx < strlen (args->type) )
                ch = args->type[args->idx];
@@ -3627,13 +3763,13 @@ FILE *in;
                          }
 
                          nxt -= len - 10;
-                         memcpy (txn + nxt, key + 10, len - 10);
+                         memcpy (buff + nxt, key + 10, len - 10);
                          nxt -= 1;
-                         txn[nxt] = len - 10;
+                         buff[nxt] = len - 10;
                          nxt -= 10;
-                         memcpy (txn + nxt, key, 10);
+                         memcpy (buff + nxt, key, 10);
                          nxt -= 1;
-                         txn[nxt] = 10;
+                         buff[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
                          slotptr(page,cnt)->type = type;
                          len = 0;
@@ -3647,7 +3783,7 @@ FILE *in;
 
                          if( bt_atomictxn (bt, page) )
                                fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
-                         nxt = sizeof(txn);
+                         nxt = sizeof(buff);
                          cnt = 0;
 
                        }
@@ -3693,49 +3829,120 @@ FILE *in;
 
        case 's':
                fprintf(stderr, "started forward scan\n");
-               if( slot = bt_startkey (bt, NULL, 0) )
-                 do {
-                       if( slotptr(bt->cachecursor, slot)->type == Librarian )
-                         slot++;
+               if( bt_startkey (bt, NULL, 0) )
+                       fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
+
+               while( bt_nextkey (bt) ) {
+                 if( bt->phase == 1 ) {
+                       len = bt->mainkey->len;
+
+                       if( bt->mainnode->type == Duplicate )
+                               len -= BtId;
 
-                       ptr = keyptr(bt->cachecursor, slot);
-                       len = ptr->len;
+                       fwrite (bt->mainkey->key, len, 1, stdout);
+                       fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
+                 } else {
+                       len = bt->cachekey->len;
 
-                       if( slotptr(bt->cachecursor, slot)->type == Duplicate )
+                       if( bt->cachenode->type == Duplicate )
                                len -= BtId;
 
-                       fwrite (ptr->key, len, 1, stdout);
-                       val = valptr(bt->cachecursor, slot);
-                       fwrite (val->value, val->len, 1, stdout);
-                       fputc ('\n', stdout);
-                       cnt++;
-                } while( slot = bt_nextkey (bt, slot) );
+                       fwrite (bt->cachekey->key, len, 1, stdout);
+                       fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
+                 }
+
+                 fputc ('\n', stdout);
+                 cnt++;
+           }
+
+               bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+
+               bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
 
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
 
        case 'r':
                fprintf(stderr, "started reverse scan\n");
-               if( slot = bt_lastkey (bt->mgr, bt->cachecursor, bt->thread_no) )
-                  while( slot = bt_prevkey (bt->mgr, bt->cachecursor, slot, bt->thread_no) ) {
-                       if( slotptr(bt->cachecursor, slot)->dead )
-                         continue;
+               if( bt_lastkey (bt) )
+                       fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
 
-                       ptr = keyptr(bt->cachecursor, slot);
-                       len = ptr->len;
+               while( bt_prevkey (bt) ) {
+                 if( bt->phase == 1 ) {
+                       len = bt->mainkey->len;
 
-                       if( slotptr(bt->cachecursor, slot)->type == Duplicate )
+                       if( bt->mainnode->type == Duplicate )
                                len -= BtId;
 
-                       fwrite (ptr->key, len, 1, stdout);
-                       val = valptr(bt->cachecursor, slot);
-                       fwrite (val->value, val->len, 1, stdout);
-                       fputc ('\n', stdout);
-                       cnt++;
+                       fwrite (bt->mainkey->key, len, 1, stdout);
+                       fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
+                 } else {
+                       len = bt->cachekey->len;
+
+                       if( bt->cachenode->type == Duplicate )
+                               len -= BtId;
+
+                       fwrite (bt->cachekey->key, len, 1, stdout);
+                       fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
                  }
 
+                 fputc ('\n', stdout);
+                 cnt++;
+           }
+
+               bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+
+               bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
+               bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
+
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
+
+       case 'c':
+               fprintf(stderr, "started counting LSM cache btree\n");
+               page_no = LEAF_page;
+
+               do {
+                 if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
+                       set->page = bt_mappage (bt->mgr, set->latch);
+                 else
+                       return 0;
+
+                 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+                 cnt += set->page->act;
+                 page_no = bt_getid (set->page->right);
+                 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+               } while( page_no );
+               
+               cachecnt = --cnt;       // remove stopper key
+               cnt = 0;
+
+               fprintf(stderr, "started counting LSM main btree\n");
+               page_no = LEAF_page;
+
+               do {
+                 if( set->latch = bt_pinleaf (bt->main, page_no, bt->thread_no) )
+                       set->page = bt_mappage (bt->main, set->latch);
+                 else
+                       return 0;
+
+                 bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+                 cnt += set->page->act;
+                 page_no = bt_getid (set->page->right);
+                 bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+               } while( page_no );
+               
+               cnt--;  // remove stopper key
+
+               fprintf(stderr, " cache keys counted %d\n", cachecnt);
+               fprintf(stderr, " main keys counted %d\n", cnt);
+               fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
+               break;
        }
 
        bt_close (bt);