]> pd.if.org Git - btree/blobdiff - threadskv8.c
Fix bug in reverse scan. New command line interface with a command per input file
[btree] / threadskv8.c
index 44793a09d99365c97979c5bd79d44905ff555283..91ec929670d8fd4ed6a27cd4fe040f8c9c038ec5 100644 (file)
@@ -112,6 +112,13 @@ typedef struct {
        ushort serving[1];
 } RWLock;
 
+//     write only queue lock
+
+typedef struct {
+       ushort ticket[1];
+       ushort serving[1];
+} WOLock;
+
 #define PHID 0x1
 #define PRES 0x2
 #define MASK 0x3
@@ -147,19 +154,14 @@ typedef struct {
        uid page_no;                    // latch set page number
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
-       RWLock parent[1];               // Posting of fence key in parent
-       RWLock atomic[1];               // Atomic update in progress
+       WOLock parent[1];               // Posting of fence key in parent
+       WOLock atomic[1];               // Atomic update in progress
        uint split;                             // right split page atomic insert
        uint entry;                             // entry slot in latch table
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
        volatile ushort pin;    // number of outstanding threads
        ushort dirty:1;                 // page in cache is dirty
-#ifdef unix
-       pthread_t atomictid;    // pid holding atomic lock
-#else
-       uint atomictid;                 // pid holding atomic lock
-#endif
 } BtLatchSet;
 
 //     Define the length of the page record numbers
@@ -189,7 +191,8 @@ typedef enum {
        Unique,
        Librarian,
        Duplicate,
-       Delete
+       Delete,
+       Update
 } BtSlotType;
 
 typedef struct {
@@ -404,6 +407,31 @@ uid bt_newdup (BtDb *bt)
 #endif
 }
 
+//     Write-Only Queue Lock
+
+void WriteOLock (WOLock *lock)
+{
+ushort tix;
+#ifdef unix
+       tix = __sync_fetch_and_add (lock->ticket, 1);
+#else
+       tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
+#endif
+       // wait for our ticket to come up
+
+       while( tix != lock->serving[0] )
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread ();
+#endif
+}
+
+void WriteORelease (WOLock *lock)
+{
+       lock->serving[0]++;
+}
+
 //     Phase-Fair reader/writer lock implementation
 
 void WriteLock (RWLock *lock)
@@ -652,7 +680,6 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
-       memset (&latch->atomictid, 0, sizeof(latch->atomictid));
        latch->page_no = page_no;
        latch->entry = slot;
        latch->split = 0;
@@ -1134,14 +1161,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch)
                WriteLock (latch->access);
                break;
        case BtLockParent:
-               WriteLock (latch->parent);
+               WriteOLock (latch->parent);
                break;
        case BtLockAtomic:
-               WriteLock (latch->atomic);
-               break;
-       case BtLockAtomic | BtLockRead:
-               WriteLock (latch->atomic);
-               ReadLock (latch->readwr);
+               WriteOLock (latch->atomic);
                break;
        }
 }
@@ -1164,16 +1187,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch)
                WriteRelease (latch->access);
                break;
        case BtLockParent:
-               WriteRelease (latch->parent);
+               WriteORelease (latch->parent);
                break;
        case BtLockAtomic:
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
-               WriteRelease (latch->atomic);
-               break;
-       case BtLockAtomic | BtLockRead:
-               ReadRelease (latch->readwr);
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
-               WriteRelease (latch->atomic);
+               WriteORelease (latch->atomic);
                break;
        }
 }
@@ -1293,20 +1310,10 @@ uint mode, prevmode;
          prevpage = 0;
        }
 
-       //      skip Atomic lock on leaf page we need to slide over
-
-       if( !drill )
-        if( mode & BtLockAtomic )
-         if( pthread_equal (set->latch->atomictid, pthread_self()) )
-               mode &= ~BtLockAtomic;
-
        // obtain mode lock using lock chaining through AccessLock
 
        bt_lockpage(mode, set->latch);
 
-       if( mode & BtLockAtomic )
-               set->latch->atomictid = pthread_self();
-
        if( set->page->free )
                return bt->err = BTERR_struct, 0;
 
@@ -1342,11 +1349,13 @@ uint mode, prevmode;
          if( drill == lvl )
                return slot;
 
+         // find next non-dead slot -- the fence key if nothing else
+
          while( slotptr(set->page, slot)->dead )
                if( slot++ < set->page->cnt )
-                       continue;
+                 continue;
                else
-                       goto slideright;
+                 return bt->err = BTERR_struct, 0;
 
          page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
@@ -2258,11 +2267,87 @@ typedef struct {
 typedef struct {
        uid page_no;            // page number for split leaf
        void *next;                     // next key to insert
-       uint entry:30;          // latch table entry number
+       uint entry:29;          // latch table entry number
        uint type:2;            // 0 == insert, 1 == delete, 2 == free
+       uint nounlock:1;        // don't unlock ParentModification
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
+//  find and load leaf page for given key
+//     leave page Atomic locked and Read locked.
+
+int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
+{
+BtLatchSet *prevlatch;
+uid page_no;
+uint slot;
+
+  //  find level zero page
+
+  if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
+       return 0;
+
+  // find next non-dead entry on this page
+  //   it will be the fence key if nothing else
+
+  while( slotptr(set->page, slot)->dead )
+       if( slot++ < set->page->cnt )
+         continue;
+       else
+         return bt->err = BTERR_struct, 0;
+
+  page_no = bt_getid(valptr(set->page, slot)->value);
+  prevlatch = set->latch;
+
+  while( page_no ) {
+       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+         set->page = bt_mappage (bt, set->latch);
+       else
+         return 0;
+
+       if( set->page->free || set->page->lvl )
+               return bt->err = BTERR_struct, 0;
+
+       // obtain read lock using lock chaining with Access mode
+       //      release & unpin parent/left sibling page
+
+       bt_lockpage(BtLockAccess, set->latch);
+
+       bt_unlockpage(BtLockRead, prevlatch);
+       bt_unpinlatch (prevlatch);
+
+       bt_lockpage(BtLockRead, set->latch);
+       bt_unlockpage(BtLockAccess, set->latch);
+
+       //  find key on page at this level
+       //  and descend to requested level
+
+       if( !set->page->kill )
+        if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
+         bt_unlockpage(BtLockRead, set->latch);
+         bt_lockpage(BtLockAtomic, set->latch);
+         bt_lockpage(BtLockAccess, set->latch);
+         bt_lockpage(BtLockRead, set->latch);
+         bt_unlockpage(BtLockAccess, set->latch);
+
+         if( slot = bt_findslot (set->page, key, len) )
+               return slot;
+
+         bt_unlockpage(BtLockAtomic, set->latch);
+         }
+
+       //  slide right into next page
+
+       page_no = bt_getid(set->page->right);
+       prevlatch = set->latch;
+  }
+
+  // return error on end of right chain
+
+  bt->err = BTERR_struct;
+  return 0;    // return error
+}
+
 //     determine actual page where key is located
 //  return slot number
 
@@ -2352,6 +2437,18 @@ BtVal *val;
        return 0;
 }
 
+uint bt_parentmatch (AtomicKey *head, uint entry)
+{
+uint cnt = 0;
+
+       if( head )
+         do if( head->entry == entry )
+               head->nounlock = 1, cnt++;
+         while( head = head->next );
+
+       return cnt;
+}
+
 //     atomic modification of a batch of keys.
 
 //     return -1 if BTERR is set
@@ -2415,7 +2512,7 @@ int type;
                bt_unlockpage(BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+         if( slot = bt_atomicload(bt, set, key->key, key->len) )
                set->latch->split = 0;
          else
                return -1;
@@ -2551,12 +2648,11 @@ int type;
          //  schedule prev fence key update
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
          leaf->type = 0;
 
          if( tail )
@@ -2572,11 +2668,11 @@ int type;
          *prev = *set;
        }
 
-       //  update left pointer in next right page
-       //      if we did any now non-empty splits
+       //  update left pointer in next right page from last split page
+       //      (if all splits were reversed, latch->split == 0)
 
        if( latch->split ) {
-         //  fix left pointer in master's original right sibling
+         //  fix left pointer in master's original (now split) right sibling
          //    or set rightmost page in page zero
 
          if( right = bt_getid (prev->page->right) ) {
@@ -2590,18 +2686,20 @@ int type;
                set->latch->dirty = 1;
            bt_unlockpage (BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
-         } else
+         } else {      // prev is rightmost page
+           bt_spinwritelock (bt->mgr->lock);
                bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+           bt_spinreleasewrite(bt->mgr->lock);
+         }
 
          //  Process last page split in chain
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
          leaf->type = 0;
   
          if( tail )
@@ -2613,6 +2711,9 @@ int type;
 
          bt_lockpage(BtLockParent, prev->latch);
          bt_unlockpage(BtLockWrite, prev->latch);
+
+         //  remove atomic lock on master page
+
          bt_unlockpage(BtLockAtomic, latch);
          continue;
        }
@@ -2626,18 +2727,18 @@ int type;
          continue;
        }
 
-       // any splits were reversed, and the
+       // any and all splits were reversed, and the
        // master page located in prev is empty, delete it
        // by pulling over master's right sibling.
-       // Delete empty master's fence
+
+       // Delete empty master's fence key
 
        ptr = keyptr(prev->page,prev->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = prev->latch->entry;
-       leaf->next = NULL;
        leaf->type = 1;
   
        if( tail )
@@ -2650,37 +2751,51 @@ int type;
        bt_lockpage(BtLockParent, prev->latch);
        bt_unlockpage(BtLockWrite, prev->latch);
 
+       // grab master's right sibling
+       //      note that the far right page never empties because
+       //      it always contains (at least) the infinite stopper key
+       //      and that all pages that don't contain any keys have
+       //      been deleted, or are being deleted under write lock.
+
        if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
                set->page = bt_mappage (bt, set->latch);
        else
                return -1;
 
-       //      add page to our transaction
-
-       bt_lockpage(BtLockAtomic, set->latch);
        bt_lockpage(BtLockWrite, set->latch);
 
-       //      pull contents over empty page
+       //      and pull contents over empty page
+       //      while preserving master's left link
 
        memcpy (set->page->left, prev->page->left, BtId);
        memcpy (prev->page, set->page, bt->mgr->page_size);
 
+       //      forward seekers to old right sibling
+       //      to new page location in prev
+
        bt_putid (set->page->right, prev->latch->page_no);
        set->latch->dirty = 1;
        set->page->kill = 1;
 
-       //      add new parent key for new master page contents
-       //      delete it after parent posts the new master fence.
+       //      add new fence key for new master page contents, delete
+       //      right sibling after parent posts the new master fence.
 
        ptr = keyptr(set->page,set->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = set->latch->entry;
-       leaf->next = NULL;
        leaf->type = 2;
   
+       //  see if right sibling key update is in the FIFO,
+       //      and ParentLock if not there.
+
+       if( !bt_parentmatch (head, leaf->entry) )
+         bt_lockpage(BtLockParent, set->latch);
+
+       bt_unlockpage(BtLockWrite, set->latch);
+
        if( tail )
          tail->next = leaf;
        else
@@ -2688,10 +2803,7 @@ int type;
 
        tail = leaf;
 
-//     bt_lockpage(BtLockParent, set->latch);
-       bt_unlockpage(BtLockWrite, set->latch);
-
-       //  fix master's far right sibling's left pointer
+       //  fix new master's right sibling's left pointer
 
        if( right = bt_getid (set->page->right) ) {
          if( set->latch = bt_pinlatch (bt, right, 1) )
@@ -2703,6 +2815,10 @@ int type;
 
          bt_unlockpage (BtLockWrite, set->latch);
          bt_unpinlatch (set->latch);
+       } else {        // master is now the far right page
+         bt_spinwritelock (bt->mgr->lock);
+         bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+         bt_spinreleasewrite(bt->mgr->lock);
        }
 
        bt_unlockpage(BtLockAtomic, latch);
@@ -2723,27 +2839,27 @@ int type;
            if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
                  return -1;
 
-           bt_unlockpage (BtLockParent, set->latch);
                break;
 
          case 1:       // delete key
                if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
                  return -1;
 
-           bt_unlockpage (BtLockParent, set->latch);
                break;
 
          case 2:       // insert key & free
            if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
                  return -1;
 
-               bt_unlockpage (BtLockAtomic, set->latch);
                bt_lockpage (BtLockDelete, set->latch);
                bt_lockpage (BtLockWrite, set->latch);
                bt_freepage (bt, set);
                break;
          }
 
+         if( !leaf->nounlock )
+           bt_unlockpage (BtLockParent, set->latch);
+
          bt_unpinlatch (set->latch);
          tail = leaf->next;
          free (leaf);
@@ -2761,7 +2877,6 @@ uint bt_lastkey (BtDb *bt)
 {
 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
 BtPageSet set[1];
-uint slot;
 
        if( set->latch = bt_pinlatch (bt, page_no, 1) )
                set->page = bt_mappage (bt, set->latch);
@@ -2769,13 +2884,12 @@ uint slot;
                return 0;
 
     bt_lockpage(BtLockRead, set->latch);
-
        memcpy (bt->cursor, set->page, bt->mgr->page_size);
-       slot = set->page->cnt;
-
     bt_unlockpage(BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
-       return slot;
+
+       bt->cursor_page = page_no;
+       return bt->cursor->cnt;
 }
 
 //     return previous slot on cursor page
@@ -2809,6 +2923,9 @@ findourself:
        
        next = bt_getid (bt->cursor->right);
 
+       if( bt->cursor->kill )
+               goto findourself;
+
        if( next != us )
          if( next == ourright )
                goto goleft;
@@ -2970,7 +3087,7 @@ uint slot = 0;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->rin & MASK )
+               if( *latch->parent->ticket != *latch->parent->serving )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
@@ -3003,7 +3120,7 @@ BtKey *ptr;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->rin & MASK )
+               if( *latch->parent->ticket != *latch->parent->serving )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
                memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
@@ -3072,12 +3189,12 @@ void *index_file (void *arg)
 uint __stdcall index_file (void *arg)
 #endif
 {
-int line = 0, found = 0, cnt = 0, unique;
+int line = 0, found = 0, cnt = 0, idx;
 uid next, page_no = LEAF_page; // start on first page of leaves
+int ch, len = 0, slot, type = 0;
 unsigned char key[BT_maxkey];
 unsigned char txn[65536];
 ThreadArg *args = arg;
-int ch, len = 0, slot;
 BtPageSet set[1];
 uint nxt = 65536;
 BtPage page;
@@ -3089,9 +3206,12 @@ FILE *in;
        bt = bt_open (args->mgr);
        page = (BtPage)txn;
 
-       unique = (args->type[1] | 0x20) != 'd';
+       if( args->idx < strlen (args->type) )
+               ch = args->type[args->idx];
+       else
+               ch = args->type[strlen(args->type) - 1];
 
-       switch(args->type[0] | 0x20)
+       switch(ch | 0x20)
        {
        case 'a':
                fprintf(stderr, "started latch mgr audit\n");
@@ -3099,13 +3219,37 @@ FILE *in;
                fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
                break;
 
-       case 't':
-               fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
+       case 'd':
+               type = Delete;
+
+       case 'p':
+               if( !type )
+                       type = Unique;
+
+               if( args->num )
+                if( type == Delete )
+                 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
+                else
+                 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
+               else
+                if( type == Delete )
+                 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
+                else
+                 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
+
                if( in = fopen (args->infile, "rb") )
                  while( ch = getc(in), ch != EOF )
                        if( ch == '\n' )
                        {
                          line++;
+
+                         if( !args->num ) {
+                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
+                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+                           len = 0;
+                               continue;
+                         }
+
                          nxt -= len - 10;
                          memcpy (txn + nxt, key + 10, len - 10);
                          nxt -= 1;
@@ -3115,7 +3259,7 @@ FILE *in;
                          nxt -= 1;
                          txn[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
-                         slotptr(page,cnt)->type = Unique;
+                         slotptr(page,cnt)->type = type;
                          len = 0;
 
                          if( cnt < args->num )
@@ -3127,7 +3271,7 @@ FILE *in;
 
                          if( bt_atomictxn (bt, page) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         nxt = 65536;
+                         nxt = sizeof(txn);
                          cnt = 0;
 
                        }
@@ -3136,23 +3280,6 @@ FILE *in;
                fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
                break;
 
-       case 'p':
-               fprintf(stderr, "started pennysort for %s\n", args->infile);
-               if( in = fopen (args->infile, "rb") )
-                 while( ch = getc(in), ch != EOF )
-                       if( ch == '\n' )
-                       {
-                         line++;
-
-                         if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         len = 0;
-                       }
-                       else if( len < BT_maxkey )
-                               key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
-               break;
-
        case 'w':
                fprintf(stderr, "started indexing for %s\n", args->infile);
                if( in = fopen (args->infile, "r") )
@@ -3161,13 +3288,7 @@ FILE *in;
                        {
                          line++;
 
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
-                         if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
+                         if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -3176,33 +3297,6 @@ FILE *in;
                fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
                break;
 
-       case 'd':
-               fprintf(stderr, "started deleting keys for %s\n", args->infile);
-               if( in = fopen (args->infile, "rb") )
-                 while( ch = getc(in), ch != EOF )
-                       if( ch == '\n' )
-                       {
-                         line++;
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
-                         if( bt_findkey (bt, key, len, NULL, 0) < 0 )
-                               fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
-                         ptr = (BtKey*)(bt->key);
-                         found++;
-
-                         if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         len = 0;
-                       }
-                       else if( len < BT_maxkey )
-                               key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
-               break;
-
        case 'f':
                fprintf(stderr, "started finding keys for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )
@@ -3210,12 +3304,6 @@ FILE *in;
                        if( ch == '\n' )
                        {
                          line++;
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
                          if( bt_findkey (bt, key, len, NULL, 0) == 0 )
                                found++;
                          else if( bt->err )
@@ -3260,6 +3348,29 @@ FILE *in;
                fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
                break;
 
+       case 'r':
+               fprintf(stderr, "started reverse scan\n");
+               if( slot = bt_lastkey (bt) )
+                  while( slot = bt_prevkey (bt, slot) ) {
+                       if( slotptr(bt->cursor, slot)->dead )
+                         continue;
+
+                       ptr = keyptr(bt->cursor, slot);
+                       len = ptr->len;
+
+                       if( slotptr(bt->cursor, slot)->type == Duplicate )
+                               len -= BtId;
+
+                       fwrite (ptr->key, len, 1, stdout);
+                       val = valptr(bt->cursor, slot);
+                       fwrite (val->value, val->len, 1, stdout);
+                       fputc ('\n', stdout);
+                       cnt++;
+                 }
+
+               fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
+               break;
+
        case 'c':
 #ifdef unix
                posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
@@ -3279,7 +3390,7 @@ FILE *in;
                }
                
                cnt--;  // remove stopper key
-               fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
+               fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
                break;
        }
 
@@ -3313,10 +3424,12 @@ BtKey *ptr;
 BtDb *bt;
 
        if( argc < 3 ) {
-               fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
-               fprintf (stderr, "  where page_bits is the page size in bits\n");
+               fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
+               fprintf (stderr, "  where idx_file is the name of the btree file\n");
+               fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
+               fprintf (stderr, "  page_bits is the page size in bits\n");
                fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
-               fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
+               fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
                fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
                exit(0);
        }