]> pd.if.org Git - btree/blobdiff - threadskv8.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv8.c
index 3bae89e89b861f8d99806ef2937a5fada4805f80..9d17a4a6a99cccb611911c7e6fbb91b36d973e36 100644 (file)
@@ -3,10 +3,11 @@
 //     phase-fair reader writer lock,
 //     librarian page split code,
 //     duplicate key management
+//     bi-directional cursors
 //     traditional buffer pool manager
-//     and ACID batched key updates
+//     and ACID batched key-value updates
 
-// 21 SEP 2014
+// 26 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -133,6 +134,14 @@ volatile typedef struct {
 #define BOTH 3
 #define SHARE 4
 
+//     write only reentrant lock
+
+typedef struct {
+       BtSpinLatch xcl[1];
+       volatile ushort tid[1];
+       volatile ushort dup[1];
+} WOLock;
+
 //  hash table entries
 
 typedef struct {
@@ -146,19 +155,14 @@ typedef struct {
        uid page_no;                    // latch set page number
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
-       RWLock parent[1];               // Posting of fence key in parent
-       RWLock atomic[1];               // Atomic update in progress
+       WOLock parent[1];               // Posting of fence key in parent
+       WOLock atomic[1];               // Atomic update in progress
        uint split;                             // right split page atomic insert
        uint entry;                             // entry slot in latch table
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
        volatile ushort pin;    // number of outstanding threads
        ushort dirty:1;                 // page in cache is dirty
-#ifdef unix
-       pthread_t atomictid;    // pid holding atomic lock
-#else
-       uint atomictid;                 // pid holding atomic lock
-#endif
 } BtLatchSet;
 
 //     Define the length of the page record numbers
@@ -188,7 +192,8 @@ typedef enum {
        Unique,
        Librarian,
        Duplicate,
-       Delete
+       Delete,
+       Update
 } BtSlotType;
 
 typedef struct {
@@ -235,6 +240,8 @@ typedef struct BtPage_ {
        unsigned char lvl:7;            // level of page
        unsigned char kill:1;           // page is being deleted
        unsigned char right[BtId];      // page number to right
+       unsigned char left[BtId];       // page number to left
+       unsigned char filler[2];        // padding to multiple of 8
 } *BtPage;
 
 //  The loadpage interface object
@@ -269,6 +276,7 @@ typedef struct {
        uint latchtotal;                        // number of page latch entries
        uint latchhash;                         // number of latch hash table slots
        uint latchvictim;                       // next latch entry to examine
+       ushort thread_no[1];            // next thread number
        BtHashEntry *hashtable;         // the buffer pool hash table entries
        BtLatchSet *latchsets;          // mapped latch set from buffer pool
        unsigned char *pagepool;        // mapped to the buffer pool pages
@@ -288,6 +296,7 @@ typedef struct {
        int found;                              // last delete or insert was found
        int err;                                // last error
        int reads, writes;              // number of reads and writes from the btree
+       ushort thread_no;               // thread number
 } BtDb;
 
 typedef enum {
@@ -401,6 +410,41 @@ uid bt_newdup (BtDb *bt)
 #endif
 }
 
+void bt_spinreleasewrite(BtSpinLatch *latch);
+void bt_spinwritelock(BtSpinLatch *latch);
+
+//     Write-Only Reentrant Lock
+
+void WriteOLock (WOLock *lock, ushort tid)
+{
+  while( 1 ) {
+       bt_spinwritelock(lock->xcl);
+
+       if( *lock->tid == tid ) {
+               *lock->dup += 1;
+               bt_spinreleasewrite(lock->xcl);
+               return;
+       }
+       if( !*lock->tid ) {
+               *lock->tid = tid;
+               bt_spinreleasewrite(lock->xcl);
+               return;
+       }
+       bt_spinreleasewrite(lock->xcl);
+       sched_yield();
+  }
+}
+
+void WriteORelease (WOLock *lock)
+{
+       if( *lock->dup ) {
+               *lock->dup -= 1;
+               return;
+       }
+
+       *lock->tid = 0;
+}
+
 //     Phase-Fair reader/writer lock implementation
 
 void WriteLock (RWLock *lock)
@@ -649,7 +693,6 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
-       memset (&latch->atomictid, 0, sizeof(latch->atomictid));
        latch->page_no = page_no;
        latch->entry = slot;
        latch->split = 0;
@@ -977,6 +1020,11 @@ BtVal *val;
        pagezero->alloc->bits = mgr->page_bits;
        bt_putid(pagezero->alloc->right, MIN_lvl+1);
 
+       //  initialize left-most LEAF page in
+       //      alloc->left.
+
+       bt_putid (pagezero->alloc->left, LEAF_page);
+
        if( bt_writepage (mgr, pagezero->alloc, 0) ) {
                fprintf (stderr, "Unable to create btree page zero\n");
                return bt_mgrclose (mgr), NULL;
@@ -1083,6 +1131,11 @@ BtDb *bt = malloc (sizeof(*bt));
 #endif
        bt->frame = (BtPage)bt->mem;
        bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
+#ifdef unix
+       bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
+#else
+       bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
+#endif
        return bt;
 }
 
@@ -1110,7 +1163,7 @@ int ans;
 
 // place write, read, or parent lock on requested page_no.
 
-void bt_lockpage(BtLock mode, BtLatchSet *latch)
+void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
@@ -1126,13 +1179,13 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch)
                WriteLock (latch->access);
                break;
        case BtLockParent:
-               WriteLock (latch->parent);
+               WriteOLock (latch->parent, bt->thread_no);
                break;
        case BtLockAtomic:
-               WriteLock (latch->atomic);
+               WriteOLock (latch->atomic, bt->thread_no);
                break;
        case BtLockAtomic | BtLockRead:
-               WriteLock (latch->atomic);
+               WriteOLock (latch->atomic, bt->thread_no);
                ReadLock (latch->readwr);
                break;
        }
@@ -1140,7 +1193,7 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch)
 
 // remove write, read, or parent lock on requested page
 
-void bt_unlockpage(BtLock mode, BtLatchSet *latch)
+void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
@@ -1156,16 +1209,14 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch)
                WriteRelease (latch->access);
                break;
        case BtLockParent:
-               WriteRelease (latch->parent);
+               WriteORelease (latch->parent);
                break;
        case BtLockAtomic:
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
-               WriteRelease (latch->atomic);
+               WriteORelease (latch->atomic);
                break;
        case BtLockAtomic | BtLockRead:
+               WriteORelease (latch->atomic);
                ReadRelease (latch->readwr);
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
-               WriteRelease (latch->atomic);
                break;
        }
 }
@@ -1273,37 +1324,27 @@ uint mode, prevmode;
        // obtain access lock using lock chaining with Access mode
 
        if( page_no > ROOT_page )
-         bt_lockpage(BtLockAccess, set->latch);
+         bt_lockpage(bt, BtLockAccess, set->latch);
 
        set->page = bt_mappage (bt, set->latch);
 
-       //      release & unpin parent page
+       //      release & unpin parent or left sibling page
 
        if( prevpage ) {
-         bt_unlockpage(prevmode, prevlatch);
+         bt_unlockpage(bt, prevmode, prevlatch);
          bt_unpinlatch (prevlatch);
          prevpage = 0;
        }
 
-       //      skip Atomic lock on leaf page if already held
-
-       if( !drill )
-        if( mode & BtLockAtomic )
-         if( pthread_equal (set->latch->atomictid, pthread_self()) )
-               mode &= ~BtLockAtomic;
-
        // obtain mode lock using lock chaining through AccessLock
 
-       bt_lockpage(mode, set->latch);
-
-       if( mode & BtLockAtomic )
-               set->latch->atomictid = pthread_self();
+       bt_lockpage(bt, mode, set->latch);
 
        if( set->page->free )
                return bt->err = BTERR_struct, 0;
 
        if( page_no > ROOT_page )
-         bt_unlockpage(BtLockAccess, set->latch);
+         bt_unlockpage(bt, BtLockAccess, set->latch);
 
        // re-read and re-lock root after determining actual level of root
 
@@ -1314,7 +1355,7 @@ uint mode, prevmode;
                drill = set->page->lvl;
 
                if( lock != BtLockRead && drill == lvl ) {
-                 bt_unlockpage(mode, set->latch);
+                 bt_unlockpage(bt, mode, set->latch);
                  bt_unpinlatch (set->latch);
                  continue;
                }
@@ -1327,29 +1368,27 @@ uint mode, prevmode;
        //  find key on page at this level
        //  and descend to requested level
 
-       if( set->page->kill )
-         goto slideright;
-
-       if( slot = bt_findslot (set->page, key, len) ) {
+       if( !set->page->kill )
+        if( slot = bt_findslot (set->page, key, len) ) {
          if( drill == lvl )
                return slot;
 
+         // find next non-dead slot -- the fence key if nothing else
+
          while( slotptr(set->page, slot)->dead )
                if( slot++ < set->page->cnt )
-                       continue;
+                 continue;
                else
-                       goto slideright;
+                 return bt->err = BTERR_struct, 0;
 
          page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
          continue;
-       }
+        }
 
        //  or slide right into next page
 
-slideright:
        page_no = bt_getid(set->page->right);
-
   } while( page_no );
 
   // return error on end of right chain
@@ -1376,8 +1415,8 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
        // unlock released page
 
-       bt_unlockpage (BtLockDelete, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_unlockpage (bt, BtLockDelete, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
 
        // unlock allocation page
@@ -1407,8 +1446,8 @@ uint idx;
        ptr = keyptr(set->page, set->page->cnt);
        memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
 
-       bt_lockpage (BtLockParent, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_lockpage (bt, BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
 
        //      insert new (now smaller) fence key
 
@@ -1425,7 +1464,7 @@ uint idx;
        if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
                return bt->err;
 
-       bt_unlockpage (BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch(set->latch);
        return 0;
 }
@@ -1453,8 +1492,8 @@ uint idx;
        else
                return bt->err;
 
-       bt_lockpage (BtLockDelete, child->latch);
-       bt_lockpage (BtLockWrite, child->latch);
+       bt_lockpage (bt, BtLockDelete, child->latch);
+       bt_lockpage (bt, BtLockWrite, child->latch);
 
        memcpy (root->page, child->page, bt->mgr->page_size);
        root->latch->dirty = 1;
@@ -1463,7 +1502,7 @@ uint idx;
 
   } while( root->page->lvl > 1 && root->page->act == 1 );
 
-  bt_unlockpage (BtLockWrite, root->latch);
+  bt_unlockpage (bt, BtLockWrite, root->latch);
   bt_unpinlatch (root->latch);
   return 0;
 }
@@ -1496,7 +1535,7 @@ BtKey *ptr;
        else
                return 0;
 
-       bt_lockpage (BtLockWrite, right->latch);
+       bt_lockpage (bt, BtLockWrite, right->latch);
 
        // cache copy of key to update
 
@@ -1519,11 +1558,11 @@ BtKey *ptr;
        right->latch->dirty = 1;
        right->page->kill = 1;
 
-       bt_lockpage (BtLockParent, right->latch);
-       bt_unlockpage (BtLockWrite, right->latch);
+       bt_lockpage (bt, BtLockParent, right->latch);
+       bt_unlockpage (bt, BtLockWrite, right->latch);
 
-       bt_lockpage (BtLockParent, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_lockpage (bt, BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
 
        // redirect higher key directly to our new node contents
 
@@ -1542,14 +1581,13 @@ BtKey *ptr;
 
        //      obtain delete and write locks to right node
 
-       bt_unlockpage (BtLockParent, right->latch);
-       bt_lockpage (BtLockDelete, right->latch);
-       bt_lockpage (BtLockWrite, right->latch);
+       bt_unlockpage (bt, BtLockParent, right->latch);
+       bt_lockpage (bt, BtLockDelete, right->latch);
+       bt_lockpage (bt, BtLockWrite, right->latch);
        bt_freepage (bt, right);
 
-       bt_unlockpage (BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
-       bt->found = 1;
        return 0;
 }
 
@@ -1600,7 +1638,7 @@ BtVal *val;
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
-               return bt->found = found, 0;
+               return 0;
 
        //      do we need to collapse root?
 
@@ -1608,7 +1646,7 @@ BtVal *val;
          if( bt_collapseroot (bt, set) )
                return bt->err;
          else
-               return bt->found = found, 0;
+               return 0;
 
        //      delete empty page
 
@@ -1616,9 +1654,9 @@ BtVal *val;
                return bt_deletepage (bt, set);
 
        set->latch->dirty = 1;
-       bt_unlockpage(BtLockWrite, set->latch);
+       bt_unlockpage(bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
-       return bt->found = found, 0;
+       return 0;
 }
 
 BtKey *bt_foundkey (BtDb *bt)
@@ -1648,13 +1686,13 @@ uid page_no;
 
        // obtain access lock using lock chaining with Access mode
 
-       bt_lockpage(BtLockAccess, set->latch);
+       bt_lockpage(bt, BtLockAccess, set->latch);
 
-       bt_unlockpage(BtLockRead, prevlatch);
+       bt_unlockpage(bt, BtLockRead, prevlatch);
        bt_unpinlatch (prevlatch);
 
-       bt_lockpage(BtLockRead, set->latch);
-       bt_unlockpage(BtLockAccess, set->latch);
+       bt_lockpage(bt, BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockAccess, set->latch);
        return 1;
 }
 
@@ -1712,7 +1750,7 @@ BtVal *val;
 
    } while( slot = bt_findnext (bt, set, slot) );
 
-  bt_unlockpage (BtLockRead, set->latch);
+  bt_unlockpage (bt, BtLockRead, set->latch);
   bt_unpinlatch (set->latch);
   return ret;
 }
@@ -1757,7 +1795,9 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || bt->frame->lvl )
+                 if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -1774,11 +1814,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -1868,7 +1906,7 @@ BtVal *val;
 
        // release and unpin root pages
 
-       bt_unlockpage(BtLockWrite, root->latch);
+       bt_unlockpage(bt, BtLockWrite, root->latch);
        bt_unpinlatch (root->latch);
 
        bt_unpinlatch (right);
@@ -1898,8 +1936,10 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
+
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
                memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
@@ -1911,11 +1951,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -1970,11 +2008,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
@@ -2017,10 +2053,10 @@ BtKey *ptr;
 
        // insert new fences in their parent pages
 
-       bt_lockpage (BtLockParent, right);
+       bt_lockpage (bt, BtLockParent, right);
 
-       bt_lockpage (BtLockParent, set->latch);
-       bt_unlockpage (BtLockWrite, set->latch);
+       bt_lockpage (bt, BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockWrite, set->latch);
 
        // insert new fence for reformulated left block of smaller keys
 
@@ -2038,10 +2074,10 @@ BtKey *ptr;
        if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
                return bt->err;
 
-       bt_unlockpage (BtLockParent, set->latch);
+       bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
 
-       bt_unlockpage (BtLockParent, right);
+       bt_unlockpage (bt, BtLockParent, right);
        bt_unpinlatch (right);
        return 0;
 }
@@ -2114,7 +2150,7 @@ BtVal *val;
        node->dead = 0;
 
        if( release ) {
-               bt_unlockpage (BtLockWrite, set->latch);
+               bt_unlockpage (bt, BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
        }
 
@@ -2199,7 +2235,7 @@ uint type;
                slotptr(set->page, slot)->dead = 0;
                val->len = vallen;
                memcpy (val->value, value, vallen);
-               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unlockpage(bt, BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
                return 0;
        }
@@ -2233,7 +2269,7 @@ uint type;
        ptr->len = keylen;
        
        slotptr(set->page, slot)->off = set->page->min;
-       bt_unlockpage(BtLockWrite, set->latch);
+       bt_unlockpage(bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
        return 0;
   }
@@ -2242,22 +2278,23 @@ uint type;
 
 typedef struct {
        uint entry;                     // latch table entry number
-       uint slot:30;           // page slot number
+       uint slot:31;           // page slot number
        uint reuse:1;           // reused previous page
-       uint emptied:1;         // page was emptied
-} AtomicMod;
+} AtomicTxn;
 
 typedef struct {
        uid page_no;            // page number for split leaf
        void *next;                     // next key to insert
-       uint entry;                     // latch table entry number
+       uint entry:29;          // latch table entry number
+       uint type:2;            // 0 == insert, 1 == delete, 2 == free
+       uint nounlock:1;        // don't unlock ParentModification
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
 //     determine actual page where key is located
-//  return slot number with set page locked
+//  return slot number
 
-uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
+uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
 {
 BtKey *key = keyptr(source,src);
 uint slot = locks[src].slot;
@@ -2274,15 +2311,18 @@ uint entry;
                return slot;
        }
 
-       //      is locks->reuse set?
-       //      if so, find where our key
-       //      is located on previous page or split pages
+       //      is locks->reuse set? or was slot zeroed?
+       //      if so, find where our key is located 
+       //      on current page or pages split on
+       //      same page txn operations.
 
        do {
                set->latch = bt->mgr->latchsets + entry;
                set->page = bt_mappage (bt, set->latch);
 
                if( slot = bt_findslot(set->page, key->key, key->len) ) {
+                 if( slotptr(set->page, slot)->type == Librarian )
+                       slot++;
                  if( locks[src].reuse )
                        locks[src].entry = entry;
                  return slot;
@@ -2293,17 +2333,17 @@ uint entry;
        return 0;
 }
 
-BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
 BtLatchSet *latch;
 BtPageSet set[1];
-uint entry;
+uint entry, slot;
 
-  while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
-       if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
-         return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
+  while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
+       if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
+         return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
 
        if( entry = bt_splitpage (bt, set) )
          latch = bt->mgr->latchsets + entry;
@@ -2313,15 +2353,16 @@ uint entry;
        //      splice right page into split chain
        //      and WriteLock it.
 
+       bt_lockpage(bt, BtLockWrite, latch);
        latch->split = set->latch->split;
        set->latch->split = entry;
-       bt_lockpage(BtLockWrite, latch);
+       locks[src].slot = 0;
   }
 
   return bt->err = BTERR_atomic;
 }
 
-BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 uint idx, entry, slot;
@@ -2330,37 +2371,108 @@ BtKey *ptr;
 BtVal *val;
 
        if( slot = bt_atomicpage (bt, source, locks, src, set) )
+         ptr = keyptr(set->page, slot);
+       else
+         return bt->err = BTERR_struct;
+
+       if( !keycmp (ptr, key->key, key->len) )
+         if( !slotptr(set->page, slot)->dead )
                slotptr(set->page, slot)->dead = 1;
+         else
+               return 0;
        else
-               return bt->err = BTERR_struct;
+               return 0;
 
-       ptr = keyptr(set->page, slot);
        val = valptr(set->page, slot);
-
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
-       set->page->act--;
-
-       // collapse empty slots beneath the fence
-
-       while( idx = set->page->cnt - 1 )
-         if( slotptr(set->page, idx)->dead ) {
-               *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
-               memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
-         } else
-               break;
-
        set->latch->dirty = 1;
+       set->page->act--;
+       bt->found++;
        return 0;
 }
 
-//     qsort comparison function
+//     delete an empty master page for a transaction
+
+//     note that the far right page never empties because
+//     it always contains (at least) the infinite stopper key
+//     and that all pages that don't contain any keys are
+//     deleted, or are being held under Atomic lock.
 
-int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
+BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
 {
-BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
-BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
+BtPageSet right[1], temp[1];
+unsigned char value[BtId];
+uid right_page_no;
+BtKey *ptr;
+
+       bt_lockpage(bt, BtLockWrite, prev->latch);
+
+       //      grab the right sibling
+
+       if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+               right->page = bt_mappage (bt, right->latch);
+       else
+               return bt->err;
+
+       bt_lockpage(bt, BtLockAtomic, right->latch);
+       bt_lockpage(bt, BtLockWrite, right->latch);
+
+       //      and pull contents over empty page
+       //      while preserving master's left link
+
+       memcpy (right->page->left, prev->page->left, BtId);
+       memcpy (prev->page, right->page, bt->mgr->page_size);
+
+       //      forward seekers to old right sibling
+       //      to new page location in set
+
+       bt_putid (right->page->right, prev->latch->page_no);
+       right->latch->dirty = 1;
+       right->page->kill = 1;
+
+       //      remove pointer to right page for searchers by
+       //      changing right fence key to point to the master page
+
+       ptr = keyptr(right->page,right->page->cnt);
+       bt_putid (value, prev->latch->page_no);
+
+       if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+               return bt->err;
+
+       //  now that master page is in good shape we can
+       //      remove its locks.
+
+       bt_unlockpage (bt, BtLockAtomic, prev->latch);
+       bt_unlockpage (bt, BtLockWrite, prev->latch);
+
+       //  fix master's right sibling's left pointer
+       //      to remove scanner's poiner to the right page
+
+       if( right_page_no = bt_getid (prev->page->right) ) {
+         if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
+               temp->page = bt_mappage (bt, temp->latch);
+
+         bt_lockpage (bt, BtLockWrite, temp->latch);
+         bt_putid (temp->page->left, prev->latch->page_no);
+         temp->latch->dirty = 1;
+
+         bt_unlockpage (bt, BtLockWrite, temp->latch);
+         bt_unpinlatch (temp->latch);
+       } else {        // master is now the far right page
+         bt_spinwritelock (bt->mgr->lock);
+         bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+         bt_spinreleasewrite(bt->mgr->lock);
+       }
 
-       return keycmp (key1, key2->key, key2->len);
+       //      now that there are no pointers to the right page
+       //      we can delete it after the last read access occurs
+
+       bt_unlockpage (bt, BtLockWrite, right->latch);
+       bt_unlockpage (bt, BtLockAtomic, right->latch);
+       bt_lockpage (bt, BtLockDelete, right->latch);
+       bt_lockpage (bt, BtLockWrite, right->latch);
+       bt_freepage (bt, right);
+       return 0;
 }
 
 //     atomic modification of a batch of keys.
@@ -2370,29 +2482,29 @@ BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
 //     causing the key constraint violation
 //     or zero on successful completion.
 
-int bt_atomicmods (BtDb *bt, BtPage source)
+int bt_atomictxn (BtDb *bt, BtPage source)
 {
-uint src, idx, slot, samepage, entry, next, split;
+uint src, idx, slot, samepage, entry;
 AtomicKey *head, *tail, *leaf;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtKey *key, *ptr, *key2;
 BtLatchSet *latch;
-AtomicMod *locks;
+AtomicTxn *locks;
 int result = 0;
 BtSlot temp[1];
 BtPage page;
 BtVal *val;
+uid right;
 int type;
 
-  locks = calloc (source->cnt + 1, sizeof(AtomicMod));
+  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
   head = NULL;
   tail = NULL;
 
-  // first sort the list of keys into order to
+  // stable sort the list of keys into order to
   //   prevent deadlocks between threads.
-  qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
-/*
+
   for( src = 1; src++ < source->cnt; ) {
        *temp = *slotptr(source,src);
        key = keyptr (source,src);
@@ -2406,13 +2518,13 @@ int type;
                break;
        }
   }
-*/
-  // Load the leafpage for each key
+
+  // Load the leaf page for each key
+  // group same page references with reuse bit
   // and determine any constraint violations
 
   for( src = 0; src++ < source->cnt; ) {
        key = keyptr(source, src);
-       val = valptr(source, src);
        slot = 0;
 
        // first determine if this modification falls
@@ -2420,13 +2532,13 @@ int type;
        //      note that the far right leaf page is a special case
 
        if( samepage = src > 1 )
-         if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
+         if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
-         else // release read on previous page
-               bt_unlockpage(BtLockRead, set->latch); 
+         else
+               bt_unlockpage(bt, BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+         if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
                set->latch->split = 0;
          else
                return -1;
@@ -2451,128 +2563,124 @@ int type;
        case Unique:
          if( !slotptr(set->page, slot)->dead )
           if( slot < set->page->cnt || bt_getid (set->page->right) )
-           if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
+           if( !keycmp (ptr, key->key, key->len) ) {
 
                  // return constraint violation if key already exists
 
+                 bt_unlockpage(bt, BtLockRead, set->latch);
                  result = src;
 
                  while( src ) {
                        if( locks[src].entry ) {
                          set->latch = bt->mgr->latchsets + locks[src].entry;
-                         bt_unlockpage(BtLockAtomic, set->latch);
-                         bt_unlockpage(BtLockRead, set->latch);
+                         bt_unlockpage(bt, BtLockAtomic, set->latch);
                          bt_unpinlatch (set->latch);
                        }
                        src--;
                  }
-
+                 free (locks);
                  return result;
                }
-
          break;
-
-       case Delete:
-         if( !slotptr(set->page, slot)->dead )
-          if( ptr->len == key->len )
-               if( !memcmp (ptr->key, key->key, key->len) )
-             break;
-
-         //  Key to delete doesn't exist
-
-         result = src;
-
-         while( src ) {
-               if( locks[src].entry ) {
-                 set->latch = bt->mgr->latchsets + locks[src].entry;
-                 bt_unlockpage(BtLockAtomic, set->latch);
-                 bt_unlockpage(BtLockRead, set->latch);
-                 bt_unpinlatch (set->latch);
-               }
-               src--;
-         }
-
-         return result;
        }
-
   }
 
   //  unlock last loadpage lock
 
-  if( source->cnt > 1 )
-       bt_unlockpage(BtLockRead, set->latch);
+  if( source->cnt )
+       bt_unlockpage(bt, BtLockRead, set->latch);
 
-  //  obtain write lock for each page
+  //  obtain write lock for each master page
 
   for( src = 0; src++ < source->cnt; )
-       if( locks[src].entry )
-         bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
+       if( locks[src].reuse )
+         continue;
+       else
+         bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
 
   // insert or delete each key
+  // process any splits or merges
+  // release Write & Atomic latches
+  // set ParentModifications and build
+  // queue of keys to insert for split pages
+  // or delete for deleted pages.
 
-  for( src = 0; src++ < source->cnt; )
-       if( slotptr(source,src)->type == Delete )
-         if( bt_atomicdelete (bt, source, locks, src) )
-               return -1;
-         else
-               continue;
-       else
-         if( bt_atomicinsert (bt, source, locks, src) )
-               return -1;
-         else
-               continue;
+  // run through txn list backwards
 
-  // set ParentModification and release WriteLock
-  // leave empty pages locked for later processing
-  // build queue of keys to insert for split pages
+  samepage = source->cnt + 1;
 
-  for( src = 0; src++ < source->cnt; ) {
+  for( src = source->cnt; src; src-- ) {
        if( locks[src].reuse )
          continue;
 
-       prev->latch = bt->mgr->latchsets + locks[src].entry;
-       prev->page = bt_mappage (bt, prev->latch);
+       //  perform the txn operations
+       //      from smaller to larger on
+       //  the same page
+
+       for( idx = src; idx < samepage; idx++ )
+        switch( slotptr(source,idx)->type ) {
+        case Delete:
+         if( bt_atomicdelete (bt, source, locks, idx) )
+               return -1;
+         break;
 
-       //  pick-up all splits from original page
+       case Duplicate:
+       case Unique:
+         if( bt_atomicinsert (bt, source, locks, idx) )
+               return -1;
+         break;
+       }
 
-       split = next = prev->latch->split;
+       //      after the same page operations have finished,
+       //  process master page for splits or deletion.
 
-       if( !prev->page->act )
-         locks[src].emptied = 1;
+       latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
+       prev->page = bt_mappage (bt, prev->latch);
+       samepage = src;
+
+       //  pick-up all splits from master page
+       //      each one is already WriteLocked.
+
+       entry = prev->latch->split;
 
-       while( entry = next ) {
+       while( entry ) {
          set->latch = bt->mgr->latchsets + entry;
          set->page = bt_mappage (bt, set->latch);
-         next = set->latch->split;
-         set->latch->split = 0;
+         entry = set->latch->split;
 
-         // delete empty previous page
+         // delete empty master page by undoing its split
+         //  (this is potentially another empty page)
+         //  note that there are no new left pointers yet
 
          if( !prev->page->act ) {
+               memcpy (set->page->left, prev->page->left, BtId);
                memcpy (prev->page, set->page, bt->mgr->page_size);
-               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (bt, BtLockDelete, set->latch);
                bt_freepage (bt, set);
 
                prev->latch->dirty = 1;
+               continue;
+         }
 
-               if( prev->page->act )
-                 locks[src].emptied = 0;
+         // remove empty page from the split chain
 
+         if( !set->page->act ) {
+               memcpy (prev->page->right, set->page->right, BtId);
+               prev->latch->split = set->latch->split;
+               bt_lockpage (bt, BtLockDelete, set->latch);
+               bt_freepage (bt, set);
                continue;
          }
 
-         // prev page is not emptied 
-         locks[src].emptied = 0;
-
-         //  schedule previous fence key update
+         //  schedule prev fence key update
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
+         leaf->type = 0;
 
          if( tail )
                tail->next = leaf;
@@ -2581,40 +2689,95 @@ int type;
 
          tail = leaf;
 
-         // remove empty block from the split chain
-
-         if( !set->page->act ) {
-               memcpy (prev->page->right, set->page->right, BtId);
-               bt_lockpage (BtLockDelete, set->latch);
-               bt_freepage (bt, set);
-               continue;
-         }
+         // splice in the left link into the split page
 
-         bt_lockpage(BtLockParent, prev->latch);
-         bt_unlockpage(BtLockWrite, prev->latch);
+         bt_putid (set->page->left, prev->latch->page_no);
+         bt_lockpage(bt, BtLockParent, prev->latch);
+         bt_unlockpage(bt, BtLockWrite, prev->latch);
          *prev = *set;
        }
 
-       //  was entire chain emptied?
+       //  update left pointer in next right page from last split page
+       //      (if all splits were reversed, latch->split == 0)
 
-       if( !prev->page->act )
-               continue;
+       if( latch->split ) {
+         //  fix left pointer in master's original (now split)
+         //  far right sibling or set rightmost page in page zero
 
-       if( !split ) {
-               bt_unlockpage(BtLockWrite, prev->latch);
-               continue;
+         if( right = bt_getid (prev->page->right) ) {
+               if( set->latch = bt_pinlatch (bt, right, 1) )
+                 set->page = bt_mappage (bt, set->latch);
+               else
+                 return -1;
+
+           bt_lockpage (bt, BtLockWrite, set->latch);
+           bt_putid (set->page->left, prev->latch->page_no);
+               set->latch->dirty = 1;
+           bt_unlockpage (bt, BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+         } else {      // prev is rightmost page
+           bt_spinwritelock (bt->mgr->lock);
+               bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+           bt_spinreleasewrite(bt->mgr->lock);
+         }
+
+         //  Process last page split in chain
+
+         ptr = keyptr(prev->page,prev->page->cnt);
+         leaf = calloc (sizeof(AtomicKey), 1);
+
+         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+         leaf->page_no = prev->latch->page_no;
+         leaf->entry = prev->latch->entry;
+         leaf->type = 0;
+  
+         if( tail )
+               tail->next = leaf;
+         else
+               head = leaf;
+
+         tail = leaf;
+
+         bt_lockpage(bt, BtLockParent, prev->latch);
+         bt_unlockpage(bt, BtLockWrite, prev->latch);
+
+         //  remove atomic lock on master page
+
+         bt_unlockpage(bt, BtLockAtomic, latch);
+         continue;
+       }
+
+       //  finished if prev page occupied (either master or final split)
+
+       if( prev->page->act ) {
+         bt_unlockpage(bt, BtLockWrite, latch);
+         bt_unlockpage(bt, BtLockAtomic, latch);
+         bt_unpinlatch(latch);
+         continue;
        }
 
-       //      Process last page split in chain
+       // any and all splits were reversed, and the
+       // master page located in prev is empty, delete it
+       // by pulling over master's right sibling.
+
+       // Remove empty master's fence key
 
        ptr = keyptr(prev->page,prev->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+
+       if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
+               return -1;
+
+       //      perform the remainder of the delete
+       //      from the FIFO queue
+
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = prev->latch->entry;
-       leaf->next = NULL;
-
+       leaf->nounlock = 1;
+       leaf->type = 2;
+  
        if( tail )
          tail->next = leaf;
        else
@@ -2622,50 +2785,46 @@ int type;
 
        tail = leaf;
 
-       bt_lockpage(BtLockParent, prev->latch);
-       bt_unlockpage(BtLockWrite, prev->latch);
+       //      leave atomic lock in place until
+       //      deletion completes in next phase.
+
+       bt_unlockpage(bt, BtLockWrite, prev->latch);
   }
   
-  // remove Atomic locks and
-  // process any empty pages
-
-  for( src = source->cnt; src; src-- ) {
-       if( locks[src].reuse )
-         continue;
+  //  add & delete keys for any pages split or merged during transaction
 
-       set->latch = bt->mgr->latchsets + locks[src].entry;
-       set->page = bt_mappage (bt, set->latch);
+  if( leaf = head )
+    do {
+         set->latch = bt->mgr->latchsets + leaf->entry;
+         set->page = bt_mappage (bt, set->latch);
 
-       //  clear original page split field
+         bt_putid (value, leaf->page_no);
+         ptr = (BtKey *)leaf->leafkey;
 
-       split = set->latch->split;
-       set->latch->split = 0;
-       bt_unlockpage (BtLockAtomic, set->latch);
+         switch( leaf->type ) {
+         case 0:       // insert key
+           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+                 return -1;
 
-       //  delete page emptied by our atomic action
+               break;
 
-       if( locks[src].emptied )
-         if( bt_deletepage (bt, set) )
-               return bt->err;
-         else
-               continue;
+         case 1:       // delete key
+               if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
+                 return -1;
 
-       if( !split )
-               bt_unpinlatch (set->latch);
-  }
+               break;
 
-  //  add keys for any pages split during action
+         case 2:       // free page
+               if( bt_atomicfree (bt, set) )
+                 return -1;
 
-  if( leaf = head )
-    do {
-         ptr = (BtKey *)leaf->leafkey;
-         bt_putid (value, leaf->page_no);
+               break;
+         }
 
-         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
-               return bt->err;
+         if( !leaf->nounlock )
+           bt_unlockpage (bt, BtLockParent, set->latch);
 
-         bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
-         bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
+         bt_unpinlatch (set->latch);
          tail = leaf->next;
          free (leaf);
        } while( leaf = tail );
@@ -2676,6 +2835,70 @@ int type;
   return 0;
 }
 
+//     set cursor to highest slot on highest page
+
+uint bt_lastkey (BtDb *bt)
+{
+uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+BtPageSet set[1];
+
+       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return 0;
+
+    bt_lockpage(bt, BtLockRead, set->latch);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
+    bt_unlockpage(bt, BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+
+       bt->cursor_page = page_no;
+       return bt->cursor->cnt;
+}
+
+//     return previous slot on cursor page
+
+uint bt_prevkey (BtDb *bt, uint slot)
+{
+uid ourright, next, us = bt->cursor_page;
+BtPageSet set[1];
+
+       if( --slot )
+               return slot;
+
+       ourright = bt_getid(bt->cursor->right);
+
+goleft:
+       if( !(next = bt_getid(bt->cursor->left)) )
+               return 0;
+
+findourself:
+       bt->cursor_page = next;
+
+       if( set->latch = bt_pinlatch (bt, next, 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return 0;
+
+    bt_lockpage(bt, BtLockRead, set->latch);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       bt_unlockpage(bt, BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       
+       next = bt_getid (bt->cursor->right);
+
+       if( bt->cursor->kill )
+               goto findourself;
+
+       if( next != us )
+         if( next == ourright )
+               goto goleft;
+         else
+               goto findourself;
+
+       return bt->cursor->cnt;
+}
+
 //  return next slot on cursor page
 //  or slide cursor right into next page
 
@@ -2705,11 +2928,11 @@ uid right;
        else
                return 0;
 
-    bt_lockpage(BtLockRead, set->latch);
+    bt_lockpage(bt, BtLockRead, set->latch);
 
        memcpy (bt->cursor, set->page, bt->mgr->page_size);
 
-       bt_unlockpage(BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
        slot = 0;
 
@@ -2734,7 +2957,7 @@ uint slot;
 
        bt->cursor_page = set->latch->page_no;
 
-       bt_unlockpage(BtLockRead, set->latch);
+       bt_unlockpage(bt, BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
        return slot;
 }
@@ -2828,7 +3051,7 @@ uint slot = 0;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->rin & MASK )
+               if( *latch->parent->tid )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
@@ -2861,9 +3084,9 @@ BtKey *ptr;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->rin & MASK )
+               if( *latch->parent->tid )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
-               memset ((ushort *)latch->parent, 0, sizeof(RWLock));
+               memset ((ushort *)latch->parent, 0, sizeof(WOLock));
 
                if( latch->pin ) {
                        fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
@@ -2930,12 +3153,12 @@ void *index_file (void *arg)
 uint __stdcall index_file (void *arg)
 #endif
 {
-int line = 0, found = 0, cnt = 0, unique;
+int line = 0, found = 0, cnt = 0, idx;
 uid next, page_no = LEAF_page; // start on first page of leaves
+int ch, len = 0, slot, type = 0;
 unsigned char key[BT_maxkey];
 unsigned char txn[65536];
 ThreadArg *args = arg;
-int ch, len = 0, slot;
 BtPageSet set[1];
 uint nxt = 65536;
 BtPage page;
@@ -2947,9 +3170,12 @@ FILE *in;
        bt = bt_open (args->mgr);
        page = (BtPage)txn;
 
-       unique = (args->type[1] | 0x20) != 'd';
+       if( args->idx < strlen (args->type) )
+               ch = args->type[args->idx];
+       else
+               ch = args->type[strlen(args->type) - 1];
 
-       switch(args->type[0] | 0x20)
+       switch(ch | 0x20)
        {
        case 'a':
                fprintf(stderr, "started latch mgr audit\n");
@@ -2957,13 +3183,37 @@ FILE *in;
                fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
                break;
 
-       case 't':
-               fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
+       case 'd':
+               type = Delete;
+
+       case 'p':
+               if( !type )
+                       type = Unique;
+
+               if( args->num )
+                if( type == Delete )
+                 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
+                else
+                 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
+               else
+                if( type == Delete )
+                 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
+                else
+                 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
+
                if( in = fopen (args->infile, "rb") )
                  while( ch = getc(in), ch != EOF )
                        if( ch == '\n' )
                        {
                          line++;
+
+                         if( !args->num ) {
+                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
+                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+                           len = 0;
+                               continue;
+                         }
+
                          nxt -= len - 10;
                          memcpy (txn + nxt, key + 10, len - 10);
                          nxt -= 1;
@@ -2973,42 +3223,25 @@ FILE *in;
                          nxt -= 1;
                          txn[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
-                         slotptr(page,cnt)->type = Unique;
+                         slotptr(page,cnt)->type = type;
                          len = 0;
 
-                         if( cnt < 25 )
+                         if( cnt < args->num )
                                continue;
 
                          page->cnt = cnt;
                          page->act = cnt;
                          page->min = nxt;
 
-                         if( bt_atomicmods (bt, page) )
+                         if( bt_atomictxn (bt, page) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         nxt = 65536;
+                         nxt = sizeof(txn);
                          cnt = 0;
 
                        }
                        else if( len < BT_maxkey )
                                key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
-               break;
-
-       case 'p':
-               fprintf(stderr, "started pennysort for %s\n", args->infile);
-               if( in = fopen (args->infile, "rb") )
-                 while( ch = getc(in), ch != EOF )
-                       if( ch == '\n' )
-                       {
-                         line++;
-
-                         if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         len = 0;
-                       }
-                       else if( len < BT_maxkey )
-                               key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
+               fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
                break;
 
        case 'w':
@@ -3019,13 +3252,7 @@ FILE *in;
                        {
                          line++;
 
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
-                         if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
+                         if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -3034,33 +3261,6 @@ FILE *in;
                fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
                break;
 
-       case 'd':
-               fprintf(stderr, "started deleting keys for %s\n", args->infile);
-               if( in = fopen (args->infile, "rb") )
-                 while( ch = getc(in), ch != EOF )
-                       if( ch == '\n' )
-                       {
-                         line++;
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
-                         if( bt_findkey (bt, key, len, NULL, 0) < 0 )
-                               fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
-                         ptr = (BtKey*)(bt->key);
-                         found++;
-
-                         if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         len = 0;
-                       }
-                       else if( len < BT_maxkey )
-                               key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
-               break;
-
        case 'f':
                fprintf(stderr, "started finding keys for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )
@@ -3068,12 +3268,6 @@ FILE *in;
                        if( ch == '\n' )
                        {
                          line++;
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
                          if( bt_findkey (bt, key, len, NULL, 0) == 0 )
                                found++;
                          else if( bt->err )
@@ -3092,7 +3286,7 @@ FILE *in;
                                set->page = bt_mappage (bt, set->latch);
                        else
                                fprintf(stderr, "unable to obtain latch"), exit(1);
-                       bt_lockpage (BtLockRead, set->latch);
+                       bt_lockpage (bt, BtLockRead, set->latch);
                        next = bt_getid (set->page->right);
 
                        for( slot = 0; slot++ < set->page->cnt; )
@@ -3111,13 +3305,36 @@ FILE *in;
                                cnt++;
                           }
 
-                       bt_unlockpage (BtLockRead, set->latch);
+                       bt_unlockpage (bt, BtLockRead, set->latch);
                        bt_unpinlatch (set->latch);
                } while( page_no = next );
 
                fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
                break;
 
+       case 'r':
+               fprintf(stderr, "started reverse scan\n");
+               if( slot = bt_lastkey (bt) )
+                  while( slot = bt_prevkey (bt, slot) ) {
+                       if( slotptr(bt->cursor, slot)->dead )
+                         continue;
+
+                       ptr = keyptr(bt->cursor, slot);
+                       len = ptr->len;
+
+                       if( slotptr(bt->cursor, slot)->type == Duplicate )
+                               len -= BtId;
+
+                       fwrite (ptr->key, len, 1, stdout);
+                       val = valptr(bt->cursor, slot);
+                       fwrite (val->value, val->len, 1, stdout);
+                       fputc ('\n', stdout);
+                       cnt++;
+                 }
+
+               fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
+               break;
+
        case 'c':
 #ifdef unix
                posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
@@ -3137,7 +3354,7 @@ FILE *in;
                }
                
                cnt--;  // remove stopper key
-               fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
+               fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
                break;
        }
 
@@ -3171,10 +3388,12 @@ BtKey *ptr;
 BtDb *bt;
 
        if( argc < 3 ) {
-               fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
-               fprintf (stderr, "  where page_bits is the page size in bits\n");
+               fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
+               fprintf (stderr, "  where idx_file is the name of the btree file\n");
+               fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
+               fprintf (stderr, "  page_bits is the page size in bits\n");
                fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
-               fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
+               fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
                fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
                exit(0);
        }