]> pd.if.org Git - btree/blobdiff - threadskv8.c
clean up atomic delete deadlock problem
[btree] / threadskv8.c
index d2cb778cb14ceed4064b8f3f17d79101d4fc9fab..f2a23c7d1ecbfddbf4957e65831b386a973c9946 100644 (file)
@@ -3,10 +3,11 @@
 //     phase-fair reader writer lock,
 //     librarian page split code,
 //     duplicate key management
+//     bi-directional cursors
 //     traditional buffer pool manager
-//     and ACID batched key updates
+//     and ACID batched key-value updates
 
-// 21 SEP 2014
+// 26 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -154,11 +155,6 @@ typedef struct {
        uint prev;                              // prev entry in hash table chain
        volatile ushort pin;    // number of outstanding threads
        ushort dirty:1;                 // page in cache is dirty
-#ifdef unix
-       pthread_t atomictid;    // pid holding atomic lock
-#else
-       uint atomictid;                 // pid holding atomic lock
-#endif
 } BtLatchSet;
 
 //     Define the length of the page record numbers
@@ -188,7 +184,8 @@ typedef enum {
        Unique,
        Librarian,
        Duplicate,
-       Delete
+       Delete,
+       Update
 } BtSlotType;
 
 typedef struct {
@@ -235,6 +232,8 @@ typedef struct BtPage_ {
        unsigned char lvl:7;            // level of page
        unsigned char kill:1;           // page is being deleted
        unsigned char right[BtId];      // page number to right
+       unsigned char left[BtId];       // page number to left
+       unsigned char filler[2];        // padding to multiple of 8
 } *BtPage;
 
 //  The loadpage interface object
@@ -649,9 +648,9 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
-       memset (&latch->atomictid, 0, sizeof(latch->atomictid));
        latch->page_no = page_no;
        latch->entry = slot;
+       latch->split = 0;
        latch->prev = 0;
        latch->pin = 1;
 
@@ -976,6 +975,11 @@ BtVal *val;
        pagezero->alloc->bits = mgr->page_bits;
        bt_putid(pagezero->alloc->right, MIN_lvl+1);
 
+       //  initialize left-most LEAF page in
+       //      alloc->left.
+
+       bt_putid (pagezero->alloc->left, LEAF_page);
+
        if( bt_writepage (mgr, pagezero->alloc, 0) ) {
                fprintf (stderr, "Unable to create btree page zero\n");
                return bt_mgrclose (mgr), NULL;
@@ -1109,62 +1113,52 @@ int ans;
 
 // place write, read, or parent lock on requested page_no.
 
-void bt_lockpage(BtLock mode, BtLatchSet *set)
+void bt_lockpage(BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
-               ReadLock (set->readwr);
+               ReadLock (latch->readwr);
                break;
        case BtLockWrite:
-               WriteLock (set->readwr);
+               WriteLock (latch->readwr);
                break;
        case BtLockAccess:
-               ReadLock (set->access);
+               ReadLock (latch->access);
                break;
        case BtLockDelete:
-               WriteLock (set->access);
+               WriteLock (latch->access);
                break;
        case BtLockParent:
-               WriteLock (set->parent);
+               WriteLock (latch->parent);
                break;
        case BtLockAtomic:
-               WriteLock (set->atomic);
-               break;
-       case BtLockAtomic | BtLockRead:
-               WriteLock (set->atomic);
-               ReadLock (set->readwr);
+               WriteLock (latch->atomic);
                break;
        }
 }
 
 // remove write, read, or parent lock on requested page
 
-void bt_unlockpage(BtLock mode, BtLatchSet *set)
+void bt_unlockpage(BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
-               ReadRelease (set->readwr);
+               ReadRelease (latch->readwr);
                break;
        case BtLockWrite:
-               WriteRelease (set->readwr);
+               WriteRelease (latch->readwr);
                break;
        case BtLockAccess:
-               ReadRelease (set->access);
+               ReadRelease (latch->access);
                break;
        case BtLockDelete:
-               WriteRelease (set->access);
+               WriteRelease (latch->access);
                break;
        case BtLockParent:
-               WriteRelease (set->parent);
+               WriteRelease (latch->parent);
                break;
        case BtLockAtomic:
-               memset (&set->atomictid, 0, sizeof(set->atomictid));
-               WriteRelease (set->atomic);
-               break;
-       case BtLockAtomic | BtLockRead:
-               ReadRelease (set->readwr);
-               memset (&set->atomictid, 0, sizeof(set->atomictid));
-               WriteRelease (set->atomic);
+               WriteRelease (latch->atomic);
                break;
        }
 }
@@ -1219,14 +1213,14 @@ int blk;
 
 //  find slot in page for given key at a given level
 
-int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
+int bt_findslot (BtPage page, unsigned char *key, uint len)
 {
-uint diff, higher = set->page->cnt, low = 1, slot;
+uint diff, higher = page->cnt, low = 1, slot;
 uint good = 0;
 
        //        make stopper key an infinite fence value
 
-       if( bt_getid (set->page->right) )
+       if( bt_getid (page->right) )
                higher++;
        else
                good++;
@@ -1239,7 +1233,7 @@ uint good = 0;
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
-               if( keycmp (keyptr(set->page, slot), key, len) < 0 )
+               if( keycmp (keyptr(page, slot), key, len) < 0 )
                        low = slot + 1;
                else
                        higher = slot, good++;
@@ -1276,7 +1270,7 @@ uint mode, prevmode;
 
        set->page = bt_mappage (bt, set->latch);
 
-       //      release & unpin parent page
+       //      release & unpin parent or left sibling page
 
        if( prevpage ) {
          bt_unlockpage(prevmode, prevlatch);
@@ -1284,20 +1278,10 @@ uint mode, prevmode;
          prevpage = 0;
        }
 
-       //      skip Atomic lock on leaf page if already held
-
-       if( !drill )
-        if( mode & BtLockAtomic )
-         if( pthread_equal (set->latch->atomictid, pthread_self()) )
-               mode &= ~BtLockAtomic;
-
        // obtain mode lock using lock chaining through AccessLock
 
        bt_lockpage(mode, set->latch);
 
-       if( mode & BtLockAtomic )
-               set->latch->atomictid = pthread_self();
-
        if( set->page->free )
                return bt->err = BTERR_struct, 0;
 
@@ -1329,15 +1313,17 @@ uint mode, prevmode;
        if( set->page->kill )
          goto slideright;
 
-       if( slot = bt_findslot (set, key, len) ) {
+       if( slot = bt_findslot (set->page, key, len) ) {
          if( drill == lvl )
                return slot;
 
+         // find next non-dead slot -- the fence key if nothing else
+
          while( slotptr(set->page, slot)->dead )
                if( slot++ < set->page->cnt )
-                       continue;
+                 continue;
                else
-                       goto slideright;
+                 return bt->err = BTERR_struct, 0;
 
          page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
@@ -1377,7 +1363,6 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
        bt_unlockpage (BtLockDelete, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
-       bt_unpinlatch (set->latch);
 
        // unlock allocation page
 
@@ -1459,6 +1444,7 @@ uint idx;
        root->latch->dirty = 1;
 
        bt_freepage (bt, child);
+       bt_unpinlatch (child->latch);
 
   } while( root->page->lvl > 1 && root->page->act == 1 );
 
@@ -1545,6 +1531,7 @@ BtKey *ptr;
        bt_lockpage (BtLockDelete, right->latch);
        bt_lockpage (BtLockWrite, right->latch);
        bt_freepage (bt, right);
+       bt_unpinlatch (right->latch);
 
        bt_unlockpage (BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
@@ -2241,20 +2228,96 @@ uint type;
 
 typedef struct {
        uint entry;                     // latch table entry number
-       uint slot:30;           // page slot number
+       uint slot:31;           // page slot number
        uint reuse:1;           // reused previous page
-       uint emptied:1;         // page was emptied
 } AtomicMod;
 
 typedef struct {
        uid page_no;            // page number for split leaf
        void *next;                     // next key to insert
-       uint entry;                     // latch table entry number
+       uint entry:29;          // latch table entry number
+       uint type:2;            // 0 == insert, 1 == delete, 2 == free
+       uint nounlock:1;        // don't unlock ParentModification
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
+//  find and load leaf page for given key
+//     leave page Atomic locked and Read locked.
+
+int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
+{
+BtLatchSet *prevlatch;
+uid page_no;
+uint slot;
+
+  //  find level zero page
+
+  if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
+       return 0;
+
+  // find next non-dead entry on this page
+  //   it will be the fence key if nothing else
+
+  while( slotptr(set->page, slot)->dead )
+       if( slot++ < set->page->cnt )
+         continue;
+       else
+         return bt->err = BTERR_struct, 0;
+
+  page_no = bt_getid(valptr(set->page, slot)->value);
+  prevlatch = set->latch;
+
+  while( page_no ) {
+       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+         set->page = bt_mappage (bt, set->latch);
+       else
+         return 0;
+
+       if( set->page->free || set->page->lvl )
+               return bt->err = BTERR_struct, 0;
+
+       // obtain read lock using lock chaining with Access mode
+       //      release & unpin parent/left sibling page
+
+       bt_lockpage(BtLockAccess, set->latch);
+
+       bt_unlockpage(BtLockRead, prevlatch);
+       bt_unpinlatch (prevlatch);
+
+       bt_lockpage(BtLockRead, set->latch);
+       bt_unlockpage(BtLockAccess, set->latch);
+
+       //  find key on page at this level
+       //  and descend to requested level
+
+       if( !set->page->kill )
+        if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
+         bt_unlockpage(BtLockRead, set->latch);
+         bt_lockpage(BtLockAtomic, set->latch);
+         bt_lockpage(BtLockAccess, set->latch);
+         bt_lockpage(BtLockRead, set->latch);
+         bt_unlockpage(BtLockAccess, set->latch);
+
+         if( slot = bt_findslot (set->page, key, len) )
+               return slot;
+
+         bt_unlockpage(BtLockAtomic, set->latch);
+         }
+
+       //  slide right into next page
+
+       page_no = bt_getid(set->page->right);
+       prevlatch = set->latch;
+  }
+
+  // return error on end of right chain
+
+  bt->err = BTERR_struct;
+  return 0;    // return error
+}
+
 //     determine actual page where key is located
-//  return slot number with set page locked
+//  return slot number
 
 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
 {
@@ -2263,7 +2326,7 @@ uint slot = locks[src].slot;
 uint entry;
 
        if( src > 1 && locks[src].reuse )
-         entry = locks[src-1].entry, locks[src].entry = entry, slot = 0;
+         entry = locks[src-1].entry, slot = 0;
        else
          entry = locks[src].entry;
 
@@ -2281,9 +2344,11 @@ uint entry;
                set->latch = bt->mgr->latchsets + entry;
                set->page = bt_mappage (bt, set->latch);
 
-               if( slot = bt_findslot(set, key->key, key->len) )
-                       return slot;
-
+               if( slot = bt_findslot(set->page, key->key, key->len) ) {
+                 if( locks[src].reuse )
+                       locks[src].entry = entry;
+                 return slot;
+               }
        } while( entry = set->latch->split );
 
        bt->err = BTERR_atomic;
@@ -2296,15 +2361,20 @@ BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
 BtLatchSet *latch;
 BtPageSet set[1];
-uint slot, entry;
+uint entry;
+
+  while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
+       if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
+         return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
 
-  while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
-       if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
-         return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
        if( entry = bt_splitpage (bt, set) )
          latch = bt->mgr->latchsets + entry;
        else
          return bt->err;
+
+       //      splice right page into split chain
+       //      and WriteLock it.
+
        latch->split = set->latch->split;
        set->latch->split = entry;
        bt_lockpage(BtLockWrite, latch);
@@ -2330,29 +2400,21 @@ BtVal *val;
        val = valptr(set->page, slot);
 
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
-       set->page->act--;
-
-       // collapse empty slots beneath the fence
-
-       while( idx = set->page->cnt - 1 )
-         if( slotptr(set->page, idx)->dead ) {
-               *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
-               memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
-         } else
-               break;
-
        set->latch->dirty = 1;
+       set->page->act--;
        return 0;
 }
 
-//     qsort comparison function
-
-int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
+uint bt_parentmatch (AtomicKey *head, uint entry)
 {
-BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
-BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
+uint cnt = 0;
+
+       if( head )
+         do if( head->entry == entry )
+               head->nounlock = 1, cnt++;
+         while( head = head->next );
 
-       return keycmp (key1, key2->key, key2->len);
+       return cnt;
 }
 
 //     atomic modification of a batch of keys.
@@ -2362,9 +2424,9 @@ BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
 //     causing the key constraint violation
 //     or zero on successful completion.
 
-int bt_atomicmods (BtDb *bt, BtPage source)
+int bt_atomictxn (BtDb *bt, BtPage source)
 {
-uint src, idx, slot, samepage, entry, next, split;
+uint src, idx, slot, samepage, entry;
 AtomicKey *head, *tail, *leaf;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
@@ -2375,16 +2437,16 @@ int result = 0;
 BtSlot temp[1];
 BtPage page;
 BtVal *val;
+uid right;
 int type;
 
   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
   head = NULL;
   tail = NULL;
 
-  // first sort the list of keys into order to
+  // stable sort the list of keys into order to
   //   prevent deadlocks between threads.
-  qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
-/*
+
   for( src = 1; src++ < source->cnt; ) {
        *temp = *slotptr(source,src);
        key = keyptr (source,src);
@@ -2398,13 +2460,13 @@ int type;
                break;
        }
   }
-*/
-  // Load the leafpage for each key
+
+  // Load the leaf page for each key
+  // group same page references with reuse bit
   // and determine any constraint violations
 
   for( src = 0; src++ < source->cnt; ) {
        key = keyptr(source, src);
-       val = valptr(source, src);
        slot = 0;
 
        // first determine if this modification falls
@@ -2412,23 +2474,23 @@ int type;
        //      note that the far right leaf page is a special case
 
        if( samepage = src > 1 )
-         if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
-               slot = bt_findslot(set, key->key, key->len);
+         if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
+               slot = bt_findslot(set->page, key->key, key->len);
          else // release read on previous page
                bt_unlockpage(BtLockRead, set->latch); 
 
-       if( !slot ) {
-         slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead);
-         set->latch->split = 0;
-       }
-
        if( !slot )
-         return -1;
+         if( slot = bt_atomicload(bt, set, key->key, key->len) )
+               set->latch->split = 0;
+         else
+               return -1;
+
+       if( slotptr(set->page, slot)->type == Librarian )
+         ptr = keyptr(set->page, ++slot);
+       else
+         ptr = keyptr(set->page, slot);
 
        if( !samepage ) {
-         for(idx = 0; idx++ < source->cnt; )
-               if( locks[idx].entry == set->latch->entry )
-                 abort();
          locks[src].entry = set->latch->entry;
          locks[src].slot = slot;
          locks[src].reuse = 0;
@@ -2438,58 +2500,30 @@ int type;
          locks[src].reuse = 1;
        }
 
-       if( slotptr(set->page, slot)->type == Librarian )
-         ptr = keyptr(set->page, ++slot);
-       else
-         ptr = keyptr(set->page, slot);
-
        switch( slotptr(source, src)->type ) {
        case Duplicate:
        case Unique:
          if( !slotptr(set->page, slot)->dead )
           if( slot < set->page->cnt || bt_getid (set->page->right) )
-           if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
+           if( !keycmp (ptr, key->key, key->len) ) {
 
                  // return constraint violation if key already exists
 
+                 bt_unlockpage(BtLockRead, set->latch);
                  result = src;
 
                  while( src ) {
                        if( locks[src].entry ) {
                          set->latch = bt->mgr->latchsets + locks[src].entry;
                          bt_unlockpage(BtLockAtomic, set->latch);
-                         bt_unlockpage(BtLockRead, set->latch);
                          bt_unpinlatch (set->latch);
                        }
                        src--;
                  }
-
+                 free (locks);
                  return result;
                }
-
          break;
-
-       case Delete:
-         if( !slotptr(set->page, slot)->dead )
-          if( ptr->len == key->len )
-               if( !memcmp (ptr->key, key->key, key->len) )
-             break;
-
-         //  Key to delete doesn't exist
-
-         result = src;
-
-         while( src ) {
-               if( locks[src].entry ) {
-                 set->latch = bt->mgr->latchsets + locks[src].entry;
-                 bt_unlockpage(BtLockAtomic, set->latch);
-                 bt_unlockpage(BtLockRead, set->latch);
-                 bt_unpinlatch (set->latch);
-               }
-               src--;
-         }
-
-         return result;
        }
   }
 
@@ -2498,117 +2532,176 @@ int type;
   if( source->cnt > 1 )
        bt_unlockpage(BtLockRead, set->latch);
 
-  //  obtain write lock for each page
+  //  obtain write lock for each master page
 
   for( src = 0; src++ < source->cnt; )
-       if( locks[src].entry )
+       if( locks[src].reuse )
+         continue;
+       else
          bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
 
   // insert or delete each key
+  // process any splits or merges
+  // release Write & Atomic latches
+  // set ParentModifications and build
+  // queue of keys to insert for split pages
+  // or delete for deleted pages.
 
-  for( src = 0; src++ < source->cnt; )
-       if( slotptr(source,src)->type == Delete )
-         if( bt_atomicdelete (bt, source, locks, src) )
-               return -1;
-         else
-               continue;
-         else if( bt_atomicinsert (bt, source, locks, src) )
-               return -1;
-         else
-               continue;
+  // run through txn list backwards
 
-  // set ParentModification and release WriteLock
-  // leave empty pages locked for later processing
-  // build queue of keys to insert for split pages
+  samepage = source->cnt + 1;
 
-  for( src = 0; src++ < source->cnt; ) {
+  for( src = source->cnt; src; src-- ) {
        if( locks[src].reuse )
          continue;
 
-       prev->latch = bt->mgr->latchsets + locks[src].entry;
-       prev->page = bt_mappage (bt, prev->latch);
+       //  perform the txn operations
+       //      from smaller to larger on
+       //  the same page
 
-       //  pick-up all splits from first entry
-       //  save page that points to this entry
+       for( idx = src; idx < samepage; idx++ )
+        switch( slotptr(source,idx)->type ) {
+        case Delete:
+         if( bt_atomicdelete (bt, source, locks, idx) )
+               return -1;
+         break;
 
-       split = next = prev->latch->split;
+       case Duplicate:
+       case Unique:
+         if( bt_atomicinsert (bt, source, locks, idx) )
+               return -1;
+         break;
+       }
+
+       //      after the same page operations have finished,
+       //  process master page for splits or deletion.
 
-       if( !prev->page->act )
-         locks[src].emptied = 1;
+       latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
+       prev->page = bt_mappage (bt, prev->latch);
+       samepage = src;
+
+       //  pick-up all splits from master page
 
-       while( entry = next ) {
+       entry = prev->latch->split;
+
+       while( entry ) {
          set->latch = bt->mgr->latchsets + entry;
          set->page = bt_mappage (bt, set->latch);
-         next = set->latch->split;
+         entry = set->latch->split;
 
-         // delete empty previous page
+         // delete empty master page
 
          if( !prev->page->act ) {
+               memcpy (set->page->left, prev->page->left, BtId);
                memcpy (prev->page, set->page, bt->mgr->page_size);
                bt_lockpage (BtLockDelete, set->latch);
                bt_freepage (bt, set);
+               bt_unpinlatch (set->latch);
 
                prev->latch->dirty = 1;
-
-               if( prev->page->act )
-                 locks[src].emptied = 0;
-
                continue;
          }
 
-         // prev page is not emptied
+         // remove empty page from the split chain
 
-         locks[src].emptied = 0;
+         if( !set->page->act ) {
+               memcpy (prev->page->right, set->page->right, BtId);
+               prev->latch->split = set->latch->split;
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_freepage (bt, set);
+               bt_unpinlatch (set->latch);
+               continue;
+         }
 
-         //  schedule previous fence key update
+         //  schedule prev fence key update
 
          ptr = keyptr(prev->page,prev->page->cnt);
-         leaf = malloc (sizeof(AtomicKey));
+         leaf = calloc (sizeof(AtomicKey), 1);
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
-         leaf->next = NULL;
+         leaf->type = 0;
 
          if( tail )
                tail->next = leaf;
          else
                head = leaf;
 
-         latch = prev->latch;
          tail = leaf;
 
-         // remove empty block from the split chain
+         bt_putid (set->page->left, prev->latch->page_no);
+         bt_lockpage(BtLockParent, prev->latch);
+         bt_unlockpage(BtLockWrite, prev->latch);
+         *prev = *set;
+       }
 
-         if( !set->page->act ) {
-               memcpy (prev->page->right, set->page->right, BtId);
-               bt_lockpage (BtLockDelete, set->latch);
-               bt_freepage (bt, set);
+       //  update left pointer in next right page
+       //      if we did any now non-empty splits
+
+       if( latch->split ) {
+         //  fix left pointer in master's original right sibling
+         //    or set rightmost page in page zero
+
+         if( right = bt_getid (prev->page->right) ) {
+               if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
+                 set->page = bt_mappage (bt, set->latch);
+               else
+                 return -1;
+
+           bt_lockpage (BtLockWrite, set->latch);
+           bt_putid (set->page->left, prev->latch->page_no);
+               set->latch->dirty = 1;
+           bt_unlockpage (BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
          } else
-               *prev = *set;
+               bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
 
-         bt_lockpage(BtLockParent, latch);
-         bt_unlockpage(BtLockWrite, latch);
+         //  Process last page split in chain
+
+         ptr = keyptr(prev->page,prev->page->cnt);
+         leaf = calloc (sizeof(AtomicKey), 1);
+
+         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+         leaf->page_no = prev->latch->page_no;
+         leaf->entry = prev->latch->entry;
+         leaf->type = 0;
+  
+         if( tail )
+               tail->next = leaf;
+         else
+               head = leaf;
+
+         tail = leaf;
+
+         bt_lockpage(BtLockParent, prev->latch);
+         bt_unlockpage(BtLockWrite, prev->latch);
+         bt_unlockpage(BtLockAtomic, latch);
+         continue;
        }
 
-       if( !prev->page->act )
-               continue;
+       //  finished if prev page occupied (either master or final split)
 
-       if( !split ) {
-               bt_unlockpage(BtLockWrite, prev->latch);
-               continue;
+       if( prev->page->act ) {
+         bt_unlockpage(BtLockWrite, latch);
+         bt_unlockpage(BtLockAtomic, latch);
+         bt_unpinlatch(latch);
+         continue;
        }
 
-       //      Process last page split in chain
+       // any splits were reversed, and the
+       // master page located in prev is empty, delete it
+       // by pulling over master's right sibling.
+       // Delete empty master's fence
 
        ptr = keyptr(prev->page,prev->page->cnt);
-       leaf = malloc (sizeof(AtomicKey));
+       leaf = calloc (sizeof(AtomicKey), 1);
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
        leaf->page_no = prev->latch->page_no;
        leaf->entry = prev->latch->entry;
-       leaf->next = NULL;
-
+       leaf->type = 1;
+  
        if( tail )
          tail->next = leaf;
        else
@@ -2618,45 +2711,104 @@ int type;
 
        bt_lockpage(BtLockParent, prev->latch);
        bt_unlockpage(BtLockWrite, prev->latch);
-  }
+
+       // grab master's right sibling
+
+       if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return -1;
+
+       bt_lockpage(BtLockWrite, set->latch);
+
+       //      pull contents over empty page
+
+       memcpy (set->page->left, prev->page->left, BtId);
+       memcpy (prev->page, set->page, bt->mgr->page_size);
+
+       bt_putid (set->page->right, prev->latch->page_no);
+       set->latch->dirty = 1;
+       set->page->kill = 1;
+
+       //      add new fence key for new master page contents, delete
+       //      right sibling after parent posts the new master fence.
+
+       ptr = keyptr(set->page,set->page->cnt);
+       leaf = calloc (sizeof(AtomicKey), 1);
+
+       memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+       leaf->page_no = prev->latch->page_no;
+       leaf->entry = set->latch->entry;
+       leaf->type = 2;
   
-  // remove Atomic locks and
-  // process any empty pages
+       //  see if right sibling is not yet in the FIFO
 
-  for( src = source->cnt; src; src-- ) {
-       if( locks[src].reuse )
-         continue;
+       if( !bt_parentmatch (head, leaf->entry) )
+         bt_lockpage(BtLockParent, set->latch);
 
-       //  delete page emptied by our atomic action
+       bt_unlockpage(BtLockWrite, set->latch);
 
-       set->latch = bt->mgr->latchsets + locks[src].entry;
-       set->page = bt_mappage (bt, set->latch);
+       if( tail )
+         tail->next = leaf;
+       else
+         head = leaf;
 
-       if( locks[src].emptied ) {
-         bt_unlockpage (BtLockAtomic, set->latch);
-         if( bt_deletepage (bt, set) )
-               return bt->err;
-         continue;
-       }
+       tail = leaf;
 
-       bt_unlockpage (BtLockAtomic, set->latch);
+       //  fix master's far right sibling's left pointer
 
-       if( !set->latch->split )
-               bt_unpinlatch (set->latch);
-  }
+       if( right = bt_getid (set->page->right) ) {
+         if( set->latch = bt_pinlatch (bt, right, 1) )
+               set->page = bt_mappage (bt, set->latch);
 
-  //  add keys for any pages split during action
+         bt_lockpage (BtLockWrite, set->latch);
+         bt_putid (set->page->left, prev->latch->page_no);
+         set->latch->dirty = 1;
+
+         bt_unlockpage (BtLockWrite, set->latch);
+         bt_unpinlatch (set->latch);
+       }
+
+       bt_unlockpage(BtLockAtomic, latch);
+  }
+  
+  //  add & delete keys for any pages split or merged during transaction
 
   if( leaf = head )
     do {
-         ptr = (BtKey *)leaf->leafkey;
+         set->latch = bt->mgr->latchsets + leaf->entry;
+         set->page = bt_mappage (bt, set->latch);
+
          bt_putid (value, leaf->page_no);
+         ptr = (BtKey *)leaf->leafkey;
 
-         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
-               return bt->err;
+         switch( leaf->type ) {
+         case 0:       // insert key
+           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+                 return -1;
+
+               break;
 
-         bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
-         bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
+         case 1:       // delete key
+               if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
+                 return -1;
+
+               break;
+
+         case 2:       // insert key & free
+           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+                 return -1;
+
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+               bt_freepage (bt, set);
+               break;
+         }
+
+         if( !leaf->nounlock )
+           bt_unlockpage (BtLockParent, set->latch);
+
+         bt_unpinlatch (set->latch);
          tail = leaf->next;
          free (leaf);
        } while( leaf = tail );
@@ -2667,6 +2819,69 @@ int type;
   return 0;
 }
 
+//     set cursor to highest slot on highest page
+
+uint bt_lastkey (BtDb *bt)
+{
+uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+BtPageSet set[1];
+uint slot;
+
+       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return 0;
+
+    bt_lockpage(BtLockRead, set->latch);
+
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       slot = set->page->cnt;
+
+    bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       return slot;
+}
+
+//     return previous slot on cursor page
+
+uint bt_prevkey (BtDb *bt, uint slot)
+{
+uid ourright, next, us = bt->cursor_page;
+BtPageSet set[1];
+
+       if( --slot )
+               return slot;
+
+       ourright = bt_getid(bt->cursor->right);
+
+goleft:
+       if( !(next = bt_getid(bt->cursor->left)) )
+               return 0;
+
+findourself:
+       bt->cursor_page = next;
+
+       if( set->latch = bt_pinlatch (bt, next, 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return 0;
+
+    bt_lockpage(BtLockRead, set->latch);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       
+       next = bt_getid (bt->cursor->right);
+
+       if( next != us )
+         if( next == ourright )
+               goto goleft;
+         else
+               goto findourself;
+
+       return bt->cursor->cnt;
+}
+
 //  return next slot on cursor page
 //  or slide cursor right into next page
 
@@ -2964,17 +3179,17 @@ FILE *in;
                          nxt -= 1;
                          txn[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
-                         slotptr(page,cnt)->type = Unique;
+                         slotptr(page,cnt)->type = Delete;
                          len = 0;
 
-                         if( cnt < 25 )
+                         if( cnt < args->num )
                                continue;
 
                          page->cnt = cnt;
                          page->act = cnt;
                          page->min = nxt;
 
-                         if( bt_atomicmods (bt, page) )
+                         if( bt_atomictxn (bt, page) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          nxt = 65536;
                          cnt = 0;