]> pd.if.org Git - btree/commitdiff
Merge branch 'master' of https://github.com/malbrain/Btree-source-code
authorunknown <karl@E04.petzent.com>
Fri, 17 Oct 2014 23:47:34 +0000 (16:47 -0700)
committerunknown <karl@E04.petzent.com>
Fri, 17 Oct 2014 23:47:34 +0000 (16:47 -0700)
Conflicts:
threadskv10b.c

1  2 
threadskv10.c
threadskv10b.c

diff --cc threadskv10.c
index 6ded2dabc32dbd44d4ed7382ea3c4844d42f976f,6ded2dabc32dbd44d4ed7382ea3c4844d42f976f..194757b25f2aa192b379b01511ef567338dd76ad
@@@ -9,7 -9,7 +9,7 @@@
  //    redo log for failure recovery
  //    and LSM B-trees for write optimization
  
--// 11 OCT 2014
++// 15 OCT 2014
  
  // author: karl malbrain, malbrain@cal.berkeley.edu
  
@@@ -113,8 -113,8 +113,8 @@@ typedef enum
  typedef struct {
    union {
        struct {
--        volatile ushort xlock;        // one writer has exclusive lock
--        volatile ushort wrt;          // count of other writers waiting
++        volatile ushort xlock[1];             // one writer has exclusive lock
++        volatile ushort wrt[1];               // count of other writers waiting
        } bits[1];
        uint value[1];
    };
@@@ -132,12 -132,12 +132,18 @@@ typedef struct 
        volatile ushort serving[1];
  } RWLock;
  
--//    write only lock
++//    write only reentrant lock
  
  typedef struct {
--      BtMutexLatch xcl[1];
--      volatile ushort tid[1];
--      volatile ushort dup[1];
++  BtMutexLatch xcl[1];
++  union {
++      struct {
++              volatile ushort tid[1];
++              volatile ushort dup[1];
++      } bits[1];
++      uint value[1];
++  };
++  volatile uint waiters[1];
  } WOLock;
  
  #define PHID 0x1
@@@ -171,7 -171,7 +177,8 @@@ typedef struct 
        uint next;                              // next entry in hash table chain
        uint prev;                              // prev entry in hash table chain
        ushort pin;                             // number of accessing threads
--      unsigned char dirty;    // page in cache is dirty (atomic set)
++      unsigned char dirty;    // page in cache is dirty (atomic setable)
++      unsigned char promote;  // page in cache is dirty (atomic setable)
        BtMutexLatch modify[1]; // modify entry lite latch
  } BtLatchSet;
  
@@@ -327,15 -327,15 +334,6 @@@ typedef struct 
        uint reuse:1;           // reused previous page
  } AtomicTxn;
  
--typedef struct {
--      uid page_no;            // page number for split leaf
--      void *next;                     // next key to insert
--      uint entry:29;          // latch table entry number
--      uint type:2;            // 0 == insert, 1 == delete, 2 == free
--      uint nounlock:1;        // don't unlock ParentModification
--      unsigned char leafkey[BT_keyarray];
--} AtomicKey;
--
  //    Catastrophic errors
  
  typedef enum {
@@@ -487,14 -487,14 +485,14 @@@ uint slept = 0
    while( 1 ) {
        *prev->value = __sync_fetch_and_or(latch->value, XCL);
  
--      if( !prev->bits->xlock ) {                      // did we set XCL?
++      if( !*prev->bits->xlock ) {                     // did we set XCL?
            if( slept )
                  __sync_fetch_and_sub(latch->value, WRT);
                return;
        }
  
        if( !slept ) {
--              prev->bits->wrt++;
++              *prev->bits->wrt += 1;
                __sync_fetch_and_add(latch->value, WRT);
        }
  
@@@ -516,7 -516,7 +514,7 @@@ BtMutexLatch prev[1]
  
        //      take write access if exclusive bit was clear
  
--      return !prev->bits->xlock;
++      return !*prev->bits->xlock;
  }
  
  //    clear write mode
@@@ -527,40 -527,40 +525,82 @@@ BtMutexLatch prev[1]
  
        *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
  
--      if( prev->bits->wrt )
++      if( *prev->bits->wrt )
          sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
  }
  
--//    Write-Only Queue Lock
++//    Write-Only Reentrant Lock
  
  void WriteOLock (WOLock *lock, ushort tid)
  {
++uint prev, waited = 0;
++
    while( 1 ) {
        bt_mutexlock(lock->xcl);
  
--      if( *lock->tid == tid ) {
--              *lock->dup += 1;
++      if( waited )
++              *lock->waiters -= 1;
++
++      if( *lock->bits->tid == tid ) {
++              *lock->bits->dup += 1;
                bt_releasemutex(lock->xcl);
                return;
        }
--      if( !*lock->tid ) {
--              *lock->tid = tid;
++      if( !*lock->bits->tid ) {
++              *lock->bits->tid = tid;
                bt_releasemutex(lock->xcl);
                return;
        }
++
++      waited = 1;
++      *lock->waiters += 1;
++      prev = *lock->value;
++
        bt_releasemutex(lock->xcl);
--      sched_yield();
++      
++      sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr );
    }
  }
  
  void WriteORelease (WOLock *lock)
  {
--      if( *lock->dup ) {
--              *lock->dup -= 1;
--              return;
++  bt_mutexlock(lock->xcl);
++
++  if( *lock->bits->dup ) {
++      *lock->bits->dup -= 1;
++    bt_releasemutex(lock->xcl);
++      return;
++  }
++
++  *lock->bits->tid = 0;
++
++  if( *lock->waiters )
++      sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr );
++  bt_releasemutex(lock->xcl);
++}
++
++//    clear lock of holders and waiters
++
++ClearWOLock (WOLock *lock)
++{
++  while( 1 ) {
++      bt_mutexlock(lock->xcl);
++
++      if( *lock->waiters ) {
++              bt_releasemutex(lock->xcl);
++              sched_yield();
++              continue;
++      }
++
++      if( *lock->bits->tid ) {
++              bt_releasemutex(lock->xcl);
++              sched_yield();
++              continue;
        }
  
--      *lock->tid = 0;
++      bt_releasemutex(lock->xcl);
++      return;
++  }
  }
  
  //    Phase-Fair reader/writer lock implementation
@@@ -1493,6 -1493,6 +1533,10 @@@ void bt_lockpage(BtLock mode, BtLatchSe
                WriteOLock (latch->atomic, thread_no);
                ReadLock (latch->readwr, thread_no);
                break;
++      case BtLockAtomic | BtLockWrite:
++              WriteOLock (latch->atomic, thread_no);
++              WriteLock (latch->readwr, thread_no);
++              break;
        }
  }
  
@@@ -1523,6 -1523,6 +1567,10 @@@ void bt_unlockpage(BtLock mode, BtLatch
                WriteORelease (latch->atomic);
                ReadRelease (latch->readwr);
                break;
++      case BtLockAtomic | BtLockWrite:
++              WriteORelease (latch->atomic);
++              WriteRelease (latch->readwr);
++              break;
        }
  }
  
@@@ -1539,7 -1539,7 +1587,7 @@@ int blk
        bt_mutexlock(mgr->lock);
  
        // use empty chain first
--      // else allocate empty page
++      // else allocate new page
  
        if( page_no = bt_getid(mgr->pagezero->freechain) ) {
                if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
  
                mgr->pagezero->activepages++;
                bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
--              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
--                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
  
--              bt_releasemutex(mgr->lock);
++              //  the page is currently free and this
++              //  will keep bt_txnpromote out.
++
++              //      contents will replace this bit
++              //  and pin will keep bt_txnpromote out
  
                contents->page_no = page_no;
--              memcpy (set->page, contents, mgr->page_size);
                set->latch->dirty = 1;
++
++              memcpy (set->page, contents, mgr->page_size);
++
++              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
++                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
++
++              bt_releasemutex(mgr->lock);
                return 0;
        }
  
          fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
        bt_releasemutex(mgr->lock);
  
++      //      keep bt_txnpromote out of this page
++
++      contents->free = 1;
        contents->page_no = page_no;
        pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
  
        else
                return mgr->err;
  
++      // now pin will keep bt_txnpromote out
++
++      set->page->free = 0;
        return 0;
  }
  
@@@ -1622,10 -1622,10 +1684,11 @@@ uint good = 0
  
  int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
  {
--uid page_no = ROOT_page, prevpage = 0;
++uid page_no = ROOT_page, prevpage_no = 0;
  uint drill = 0xff, slot;
  BtLatchSet *prevlatch;
  uint mode, prevmode;
++BtPage prevpage;
  BtVal *val;
  
    //  start at root of btree and drill down
          bt_lockpage(BtLockAccess, set->latch, thread_no);
  
        set->page = bt_mappage (mgr, set->latch);
++if( set->latch->promote )
++abort();
  
        //      release & unpin parent or left sibling page
  
--      if( prevpage ) {
++      if( prevpage_no ) {
          bt_unlockpage(prevmode, prevlatch);
          bt_unpinlatch (mgr, prevlatch);
--        prevpage = 0;
++        prevpage_no = 0;
        }
  
        // obtain mode lock using lock chaining through AccessLock
                }
        }
  
--      prevpage = set->latch->page_no;
++      prevpage_no = set->latch->page_no;
        prevlatch = set->latch;
++      prevpage = set->page;
        prevmode = mode;
  
        //  find key on page at this level
  
  //    return page to free list
  //    page must be delete & write locked
++//    and have no keys pointing to it.
  
  void bt_freepage (BtMgr *mgr, BtPageSet *set)
  {
  
        memcpy(set->page->right, mgr->pagezero->freechain, BtId);
        bt_putid(mgr->pagezero->freechain, set->latch->page_no);
++      set->latch->promote = 0;
        set->latch->dirty = 1;
        set->page->free = 1;
  
        // decrement active page count
  
        mgr->pagezero->activepages--;
++
        if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
          fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
  
@@@ -1845,7 -1845,7 +1914,7 @@@ uint idx
  //    returns with page removed
  //    from the page pool.
  
--BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
++BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey)
  {
  unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
  unsigned char value[BtId];
@@@ -1854,7 -1854,7 +1923,6 @@@ BtPageSet right[1]
  uid page_no;
  BtKey *ptr;
  
--
        //      cache copy of fence key
        //      to remove in parent
  
  
        ptr = (BtKey*)lowerfence;
  
--      if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
++      if( delkey )
++       if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
          return mgr->err;
  
        //      obtain delete and write locks to right node
@@@ -1978,9 -1978,9 +2047,12 @@@ BtVal *val
                        break;
          }
  
++      if( !found )
++              return 0;
++
        //      did we delete a fence key in an upper level?
  
--      if( found && lvl && set->page->act && fence )
++      if( lvl && set->page->act && fence )
          if( bt_fixfence (mgr, set, lvl, thread_no) )
                return mgr->err;
          else
  
        //      do we need to collapse root?
  
--      if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
++      if( set->latch->page_no == ROOT_page && set->page->act == 1 )
          if( bt_collapseroot (mgr, set, thread_no) )
                return mgr->err;
          else
  
        //      delete empty page
  
--      if( !set->page->act ) {
--        if( bt_deletepage (mgr, set, thread_no) )
--              return mgr->err;
--      } else {
--        set->latch->dirty = 1;
--        bt_unlockpage(BtLockWrite, set->latch);
--        bt_unpinlatch (mgr, set->latch);
--      }
++      if( !set->page->act )
++        return bt_deletepage (mgr, set, thread_no, 1);
  
++      set->latch->dirty = 1;
++      bt_unlockpage(BtLockWrite, set->latch);
++      bt_unpinlatch (mgr, set->latch);
        return 0;
  }
  
@@@ -2270,7 -2270,7 +2339,7 @@@ BtVal *val
  //  split already locked full node
  //    leave it locked.
  //    return pool entry for new right
--//    page, unlocked
++//    page, pinned & unlocked
  
  uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
  {
@@@ -2385,8 -2385,8 +2454,8 @@@ uint prev
  }
  
  //    fix keys for newly split page
--//    call with page locked, return
--//    unlocked
++//    call with both pages pinned & locked
++//  return unlocked and unpinned
  
  BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
  {
@@@ -2706,13 -2706,13 +2775,15 @@@ uint entry, slot
          return 0;
        }
  
++      //  split page
++
        if( entry = bt_splitpage (mgr, set, thread_no) )
          latch = mgr->latchsets + entry;
        else
          return mgr->err;
  
        //      splice right page into split chain
--      //      and WriteLock it.
++      //      and WriteLock it
  
        bt_lockpage(BtLockWrite, latch, thread_no);
        latch->split = set->latch->split;
@@@ -2763,99 -2763,99 +2834,15 @@@ BtVal *val
        return 0;
  }
  
--//    delete an empty master page for a transaction
--
--//    note that the far right page never empties because
--//    it always contains (at least) the infinite stopper key
--//    and that all pages that don't contain any keys are
--//    deleted, or are being held under Atomic lock.
--
--BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
++int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
  {
--BtPageSet right[1], temp[1];
--unsigned char value[BtId];
--uid right_page_no;
--BtKey *ptr;
--
--      bt_lockpage(BtLockWrite, prev->latch, thread_no);
--
--      //      grab the right sibling
--
--      if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
--              right->page = bt_mappage (mgr, right->latch);
--      else
--              return mgr->err;
--
--      bt_lockpage(BtLockAtomic, right->latch, thread_no);
--      bt_lockpage(BtLockWrite, right->latch, thread_no);
--
--      //      and pull contents over empty page
--      //      while preserving master's left link
--
--      memcpy (right->page->left, prev->page->left, BtId);
--      memcpy (prev->page, right->page, mgr->page_size);
--
--      //      forward seekers to old right sibling
--      //      to new page location in set
--
--      bt_putid (right->page->right, prev->latch->page_no);
--      right->latch->dirty = 1;
--      right->page->kill = 1;
--
--      //      remove pointer to right page for searchers by
--      //      changing right fence key to point to the master page
--
--      ptr = keyptr(right->page,right->page->cnt);
--      bt_putid (value, prev->latch->page_no);
--
--      if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
--              return mgr->err;
--
--      //  now that master page is in good shape we can
--      //      remove its locks.
--
--      bt_unlockpage (BtLockAtomic, prev->latch);
--      bt_unlockpage (BtLockWrite, prev->latch);
--
--      //  fix master's right sibling's left pointer
--      //      to remove scanner's poiner to the right page
--
--      if( right_page_no = bt_getid (prev->page->right) ) {
--        if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
--              temp->page = bt_mappage (mgr, temp->latch);
--        else
--              return mgr->err;
--
--        bt_lockpage (BtLockWrite, temp->latch, thread_no);
--        bt_putid (temp->page->left, prev->latch->page_no);
--        temp->latch->dirty = 1;
--
--        bt_unlockpage (BtLockWrite, temp->latch);
--        bt_unpinlatch (mgr, temp->latch);
--      } else {        // master is now the far right page
--        bt_mutexlock (mgr->lock);
--        bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
--        bt_releasemutex(mgr->lock);
--      }
++BtKey *key1 = (BtKey *)((char *)page + slot1->off);
++BtKey *key2 = (BtKey *)((char *)page + slot2->off);
  
--      //      now that there are no pointers to the right page
--      //      we can delete it after the last read access occurs
--
--      bt_unlockpage (BtLockWrite, right->latch);
--      bt_unlockpage (BtLockAtomic, right->latch);
--      bt_lockpage (BtLockDelete, right->latch, thread_no);
--      bt_lockpage (BtLockWrite, right->latch, thread_no);
--      bt_freepage (mgr, right);
--      return 0;
++      return keycmp (key1, key2->key, key2->len);
  }
--
  //    atomic modification of a batch of keys.
  
--//    return -1 if BTERR is set
--//    otherwise return slot number
--//    causing the key constraint violation
--//    or zero on successful completion.
--
  BTERR bt_atomictxn (BtDb *bt, BtPage source)
  {
  uint src, idx, slot, samepage, entry, que = 0;
@@@ -2867,7 -2867,7 +2854,7 @@@ int type
  
    // stable sort the list of keys into order to
    //  prevent deadlocks between threads.
--
++/*
    for( src = 1; src++ < source->cnt; ) {
        *temp = *slotptr(source,src);
        key = keyptr (source,src);
                break;
        }
    }
--
++*/
++      qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
    //  add entries to redo log
  
    if( bt->mgr->pagezero->redopages )
  BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
  {
  uint src, idx, slot, samepage, entry, que = 0;
--AtomicKey *head, *tail, *leaf;
  BtPageSet set[1], prev[1];
  unsigned char value[BtId];
  BtLatchSet *latch;
@@@ -2923,9 -2923,9 +2910,6 @@@ uid right
  
    locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
  
--  head = NULL;
--  tail = NULL;
--
    // Load the leaf page for each key
    // group same page references with reuse bit
    // and determine any constraint violations
        if( samepage = src > 1 )
          if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
--        else
--              bt_unlockpage(BtLockRead, set->latch); 
  
        if( !slot )
--        if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) )
++        if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) )
                set->latch->split = 0;
          else
                return mgr->err;
        locks[src].reqlsn = set->page->lsn;
    }
  
--  //  unlock last loadpage lock
--
--  if( source->cnt )
--      bt_unlockpage(BtLockRead, set->latch);
--
    //  obtain write lock for each master page
--  //  sync flushed pages to disk
  
    for( src = 0; src++ < source->cnt; ) {
        if( locks[src].reuse )
  
    // insert or delete each key
    // process any splits or merges
--  // release Write & Atomic latches
--  // set ParentModifications and build
--  // queue of keys to insert for split pages
--  // or delete for deleted pages.
--
    // run through txn list backwards
  
    samepage = source->cnt + 1;
        samepage = src;
  
        //  pick-up all splits from master page
--      //      each one is already WriteLocked.
--
--      entry = prev->latch->split;
++      //      each one is already pinned & WriteLocked.
  
--      while( entry ) {
++      if( entry = prev->latch->split ) do {
          set->latch = mgr->latchsets + entry;
          set->page = bt_mappage (mgr, set->latch);
--        entry = set->latch->split;
  
          // delete empty master page by undoing its split
          //  (this is potentially another empty page)
--        //  note that there are no new left pointers yet
++        //  note that there are no pointers to it yet
  
          if( !prev->page->act ) {
                memcpy (set->page->left, prev->page->left, BtId);
                memcpy (prev->page, set->page, mgr->page_size);
                bt_lockpage (BtLockDelete, set->latch, thread_no);
--              bt_freepage (mgr, set);
--
                prev->latch->split = set->latch->split;
                prev->latch->dirty = 1;
++              bt_freepage (mgr, set);
                continue;
          }
  
--        // remove empty page from the split chain
--        // and return it to the free list.
++        // remove empty split page from the split chain
++        // and return it to the free list. No other
++        // thread has its page number yet.
  
          if( !set->page->act ) {
                memcpy (prev->page->right, set->page->right, BtId);
                prev->latch->split = set->latch->split;
++
                bt_lockpage (BtLockDelete, set->latch, thread_no);
                bt_freepage (mgr, set);
                continue;
          }
  
--        //  schedule prev fence key update
++        //  update prev's fence key
  
          ptr = keyptr(prev->page,prev->page->cnt);
--        leaf = calloc (sizeof(AtomicKey), 1), que++;
--
--        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
--        leaf->entry = prev->latch - mgr->latchsets;
--        leaf->page_no = prev->latch->page_no;
--        leaf->type = 0;
--
--        if( tail )
--              tail->next = leaf;
--        else
--              head = leaf;
++        bt_putid (value, prev->latch->page_no);
  
--        tail = leaf;
++        if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
++              return mgr->err;
  
          // splice in the left link into the split page
  
          bt_putid (set->page->left, prev->latch->page_no);
--        bt_lockpage(BtLockParent, prev->latch, thread_no);
--        bt_unlockpage(BtLockWrite, prev->latch);
++
          if( lsm )
                bt_syncpage (mgr, prev->page, prev->latch);
++
++        // page is unpinned below to avoid bt_txnpromote
++
++        bt_unlockpage(BtLockWrite, prev->latch);
          *prev = *set;
--      }
++      } while( entry = prev->latch->split );
  
        //  update left pointer in next right page from last split page
--      //      (if all splits were reversed, latch->split == 0)
++      //      (if all splits were reversed or none occurred, latch->split == 0)
  
        if( latch->split ) {
          //  fix left pointer in master's original (now split)
            bt_lockpage (BtLockWrite, set->latch, thread_no);
            bt_putid (set->page->left, prev->latch->page_no);
                set->latch->dirty = 1;
++
            bt_unlockpage (BtLockWrite, set->latch);
                bt_unpinlatch (mgr, set->latch);
          } else {      // prev is rightmost page
          }
  
          //  Process last page split in chain
++        //  by switching the key from the master
++        //  page to the last split.
  
          ptr = keyptr(prev->page,prev->page->cnt);
--        leaf = calloc (sizeof(AtomicKey), 1), que++;
--
--        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
--        leaf->entry = prev->latch - mgr->latchsets;
--        leaf->page_no = prev->latch->page_no;
--        leaf->type = 0;
--  
--        if( tail )
--              tail->next = leaf;
--        else
--              head = leaf;
++        bt_putid (value, prev->latch->page_no);
  
--        tail = leaf;
++        if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
++              return mgr->err;
  
--        bt_lockpage(BtLockParent, prev->latch, thread_no);
          bt_unlockpage(BtLockWrite, prev->latch);
  
          if( lsm )
                bt_syncpage (mgr, prev->page, prev->latch);
  
--        //  remove atomic lock on master page
--
          bt_unlockpage(BtLockAtomic, latch);
++        bt_unpinlatch(mgr, latch);
++
++        // go through the list of splits and
++        // release the latch pins
++
++        while( entry = latch->split ) {
++              latch = mgr->latchsets + entry;
++              bt_unpinlatch(mgr, latch);
++        }
++
          continue;
        }
  
--      //  finished if prev page occupied (either master or final split)
++      //      since there are no splits, we're
++      //  finished if master page occupied
  
        if( prev->page->act ) {
--        bt_unlockpage(BtLockWrite, latch);
--        bt_unlockpage(BtLockAtomic, latch);
++        bt_unlockpage(BtLockAtomic, prev->latch);
++        bt_unlockpage(BtLockWrite, prev->latch);
  
          if( lsm )
--              bt_syncpage (mgr, prev->page, latch);
++              bt_syncpage (mgr, prev->page, prev->latch);
  
--        bt_unpinlatch(mgr, latch);
++        bt_unpinlatch(mgr, prev->latch);
          continue;
        }
  
        // any and all splits were reversed, and the
        // master page located in prev is empty, delete it
--      // by pulling over master's right sibling.
--
--      // Remove empty master's fence key
--
--      ptr = keyptr(prev->page,prev->page->cnt);
--
--      if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
--              return mgr->err;
--
--      //      perform the remainder of the delete
--      //      from the FIFO queue
--
--      leaf = calloc (sizeof(AtomicKey), 1), que++;
--
--      memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
--      leaf->entry = prev->latch - mgr->latchsets;
--      leaf->page_no = prev->latch->page_no;
--      leaf->nounlock = 1;
--      leaf->type = 2;
--  
--      if( tail )
--        tail->next = leaf;
--      else
--        head = leaf;
--
--      tail = leaf;
--
--      //      leave atomic lock in place until
--      //      deletion completes in next phase.
--
--      bt_unlockpage(BtLockWrite, prev->latch);
--
--      if( lsm )
--        bt_syncpage (mgr, prev->page, prev->latch);
  
++      if( bt_deletepage (mgr, prev, thread_no, 1) )
++              return mgr->err;
    }
  
--  //  add & delete keys for any pages split or merged during transaction
--
--  if( leaf = head )
--    do {
--        set->latch = mgr->latchsets + leaf->entry;
--        set->page = bt_mappage (mgr, set->latch);
--
--        bt_putid (value, leaf->page_no);
--        ptr = (BtKey *)leaf->leafkey;
--
--        switch( leaf->type ) {
--        case 0:       // insert key
--          if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
--                return mgr->err;
--
--              break;
--
--        case 1:       // delete key
--              if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
--                return mgr->err;
--
--              break;
--
--        case 2:       // free page
--              if( bt_atomicfree (mgr, set, thread_no) )
--                return mgr->err;
--
--              break;
--        }
--
--        if( !leaf->nounlock )
--          bt_unlockpage (BtLockParent, set->latch);
--
--        bt_unpinlatch (mgr, set->latch);
--        tail = leaf->next;
--        free (leaf);
--      } while( leaf = tail );
--
    free (locks);
    return 0;
  }
@@@ -3262,7 -3262,7 +3157,6 @@@ BtVal *val
                continue;
  
        set->latch = bt->mgr->latchsets + entry;
--      idx = set->latch->page_no % bt->mgr->latchhash;
  
        if( !bt_mutextry(set->latch->modify) )
                continue;
          continue;
        }
  
--      bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
--      bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
--
--      // entry interiour node or being or was killed
++      // entry interiour node or being killed or promoted
  
        if( set->page->lvl || set->page->free || set->page->kill ) {
          bt_releasemutex(set->latch->modify);
--      bt_unlockpage(BtLockAtomic, set->latch);
--      bt_unlockpage(BtLockWrite, set->latch);
          continue;
        }
  
        //  pin the page for our useage
  
        set->latch->pin++;
++      set->latch->promote = 1;
        bt_releasemutex(set->latch->modify);
  
++      bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
++
++      //  remove the key for the page
++      //      and wait for other threads to fade away
++
++      ptr = keyptr(set->page, set->page->cnt);
++
++      if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
++              return bt->mgr->err;
++
++      bt_unlockpage (BtLockWrite, set->latch);
++while( (set->latch->pin & ~CLOCK_bit) > 1 )
++sched_yield();
++      bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
++      bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
++      bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
++
        // transfer slots in our selected page to larger btree
  if( !(set->latch->page_no % 100) )
  fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
  
--      if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) )
++      if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
                return bt->main->err;
  
--    bt_unlockpage(BtLockAtomic, set->latch);
--
        //  now delete the page
  
--      if( bt_deletepage (bt->mgr, set, bt->thread_no) )
--              return bt->mgr->err;
--
--      return 0;
++      bt_unlockpage (BtLockDelete, set->latch);
++      bt_unlockpage (BtLockAtomic, set->latch);
++      return bt_deletepage (bt->mgr, set, bt->thread_no, 0);
    }
  }
  
diff --cc threadskv10b.c
index 825b5f5175ba8d35bc7bb682f857941ac8f6a114,fa68f22afc5ed814f07cf0a3d5bd4d9d6bfa5961..09a657e3de7c1a543a688b61d2e94affea998f46
@@@ -2,7 -2,7 +2,6 @@@
  //    with reworked bt_deletekey code,
  //    phase-fair re-entrant reader writer locks,
  //    librarian page split code,
--//    duplicate key management
  //    bi-directional cursors
  //    traditional buffer pool manager
  //    ACID batched key-value updates