// redo log for failure recovery
// and LSM B-trees for write optimization
--// 11 OCT 2014
++// 15 OCT 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
typedef struct {
union {
struct {
-- volatile ushort xlock; // one writer has exclusive lock
-- volatile ushort wrt; // count of other writers waiting
++ volatile ushort xlock[1]; // one writer has exclusive lock
++ volatile ushort wrt[1]; // count of other writers waiting
} bits[1];
uint value[1];
};
volatile ushort serving[1];
} RWLock;
--// write only lock
++// write only reentrant lock
typedef struct {
-- BtMutexLatch xcl[1];
-- volatile ushort tid[1];
-- volatile ushort dup[1];
++ BtMutexLatch xcl[1];
++ union {
++ struct {
++ volatile ushort tid[1];
++ volatile ushort dup[1];
++ } bits[1];
++ uint value[1];
++ };
++ volatile uint waiters[1];
} WOLock;
#define PHID 0x1
uint next; // next entry in hash table chain
uint prev; // prev entry in hash table chain
ushort pin; // number of accessing threads
-- unsigned char dirty; // page in cache is dirty (atomic set)
++ unsigned char dirty; // page in cache is dirty (atomic setable)
++ unsigned char promote; // page in cache is dirty (atomic setable)
BtMutexLatch modify[1]; // modify entry lite latch
} BtLatchSet;
uint reuse:1; // reused previous page
} AtomicTxn;
--typedef struct {
-- uid page_no; // page number for split leaf
-- void *next; // next key to insert
-- uint entry:29; // latch table entry number
-- uint type:2; // 0 == insert, 1 == delete, 2 == free
-- uint nounlock:1; // don't unlock ParentModification
-- unsigned char leafkey[BT_keyarray];
--} AtomicKey;
--
// Catastrophic errors
typedef enum {
while( 1 ) {
*prev->value = __sync_fetch_and_or(latch->value, XCL);
-- if( !prev->bits->xlock ) { // did we set XCL?
++ if( !*prev->bits->xlock ) { // did we set XCL?
if( slept )
__sync_fetch_and_sub(latch->value, WRT);
return;
}
if( !slept ) {
-- prev->bits->wrt++;
++ *prev->bits->wrt += 1;
__sync_fetch_and_add(latch->value, WRT);
}
// take write access if exclusive bit was clear
-- return !prev->bits->xlock;
++ return !*prev->bits->xlock;
}
// clear write mode
*prev->value = __sync_fetch_and_and(latch->value, ~XCL);
-- if( prev->bits->wrt )
++ if( *prev->bits->wrt )
sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
}
--// Write-Only Queue Lock
++// Write-Only Reentrant Lock
void WriteOLock (WOLock *lock, ushort tid)
{
++uint prev, waited = 0;
++
while( 1 ) {
bt_mutexlock(lock->xcl);
-- if( *lock->tid == tid ) {
-- *lock->dup += 1;
++ if( waited )
++ *lock->waiters -= 1;
++
++ if( *lock->bits->tid == tid ) {
++ *lock->bits->dup += 1;
bt_releasemutex(lock->xcl);
return;
}
-- if( !*lock->tid ) {
-- *lock->tid = tid;
++ if( !*lock->bits->tid ) {
++ *lock->bits->tid = tid;
bt_releasemutex(lock->xcl);
return;
}
++
++ waited = 1;
++ *lock->waiters += 1;
++ prev = *lock->value;
++
bt_releasemutex(lock->xcl);
-- sched_yield();
++
++ sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr );
}
}
void WriteORelease (WOLock *lock)
{
-- if( *lock->dup ) {
-- *lock->dup -= 1;
-- return;
++ bt_mutexlock(lock->xcl);
++
++ if( *lock->bits->dup ) {
++ *lock->bits->dup -= 1;
++ bt_releasemutex(lock->xcl);
++ return;
++ }
++
++ *lock->bits->tid = 0;
++
++ if( *lock->waiters )
++ sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr );
++ bt_releasemutex(lock->xcl);
++}
++
++// clear lock of holders and waiters
++
++ClearWOLock (WOLock *lock)
++{
++ while( 1 ) {
++ bt_mutexlock(lock->xcl);
++
++ if( *lock->waiters ) {
++ bt_releasemutex(lock->xcl);
++ sched_yield();
++ continue;
++ }
++
++ if( *lock->bits->tid ) {
++ bt_releasemutex(lock->xcl);
++ sched_yield();
++ continue;
}
-- *lock->tid = 0;
++ bt_releasemutex(lock->xcl);
++ return;
++ }
}
// Phase-Fair reader/writer lock implementation
WriteOLock (latch->atomic, thread_no);
ReadLock (latch->readwr, thread_no);
break;
++ case BtLockAtomic | BtLockWrite:
++ WriteOLock (latch->atomic, thread_no);
++ WriteLock (latch->readwr, thread_no);
++ break;
}
}
WriteORelease (latch->atomic);
ReadRelease (latch->readwr);
break;
++ case BtLockAtomic | BtLockWrite:
++ WriteORelease (latch->atomic);
++ WriteRelease (latch->readwr);
++ break;
}
}
bt_mutexlock(mgr->lock);
// use empty chain first
-- // else allocate empty page
++ // else allocate new page
if( page_no = bt_getid(mgr->pagezero->freechain) ) {
if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
mgr->pagezero->activepages++;
bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
-- if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
-- fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
-- bt_releasemutex(mgr->lock);
++ // the page is currently free and this
++ // will keep bt_txnpromote out.
++
++ // contents will replace this bit
++ // and pin will keep bt_txnpromote out
contents->page_no = page_no;
-- memcpy (set->page, contents, mgr->page_size);
set->latch->dirty = 1;
++
++ memcpy (set->page, contents, mgr->page_size);
++
++ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
++ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
++
++ bt_releasemutex(mgr->lock);
return 0;
}
fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
bt_releasemutex(mgr->lock);
++ // keep bt_txnpromote out of this page
++
++ contents->free = 1;
contents->page_no = page_no;
pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
else
return mgr->err;
++ // now pin will keep bt_txnpromote out
++
++ set->page->free = 0;
return 0;
}
int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
{
--uid page_no = ROOT_page, prevpage = 0;
++uid page_no = ROOT_page, prevpage_no = 0;
uint drill = 0xff, slot;
BtLatchSet *prevlatch;
uint mode, prevmode;
++BtPage prevpage;
BtVal *val;
// start at root of btree and drill down
bt_lockpage(BtLockAccess, set->latch, thread_no);
set->page = bt_mappage (mgr, set->latch);
++if( set->latch->promote )
++abort();
// release & unpin parent or left sibling page
-- if( prevpage ) {
++ if( prevpage_no ) {
bt_unlockpage(prevmode, prevlatch);
bt_unpinlatch (mgr, prevlatch);
-- prevpage = 0;
++ prevpage_no = 0;
}
// obtain mode lock using lock chaining through AccessLock
}
}
-- prevpage = set->latch->page_no;
++ prevpage_no = set->latch->page_no;
prevlatch = set->latch;
++ prevpage = set->page;
prevmode = mode;
// find key on page at this level
// return page to free list
// page must be delete & write locked
++// and have no keys pointing to it.
void bt_freepage (BtMgr *mgr, BtPageSet *set)
{
memcpy(set->page->right, mgr->pagezero->freechain, BtId);
bt_putid(mgr->pagezero->freechain, set->latch->page_no);
++ set->latch->promote = 0;
set->latch->dirty = 1;
set->page->free = 1;
// decrement active page count
mgr->pagezero->activepages--;
++
if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
// returns with page removed
// from the page pool.
--BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
++BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey)
{
unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
unsigned char value[BtId];
uid page_no;
BtKey *ptr;
--
// cache copy of fence key
// to remove in parent
ptr = (BtKey*)lowerfence;
-- if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
++ if( delkey )
++ if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
return mgr->err;
// obtain delete and write locks to right node
break;
}
++ if( !found )
++ return 0;
++
// did we delete a fence key in an upper level?
-- if( found && lvl && set->page->act && fence )
++ if( lvl && set->page->act && fence )
if( bt_fixfence (mgr, set, lvl, thread_no) )
return mgr->err;
else
// do we need to collapse root?
-- if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
++ if( set->latch->page_no == ROOT_page && set->page->act == 1 )
if( bt_collapseroot (mgr, set, thread_no) )
return mgr->err;
else
// delete empty page
-- if( !set->page->act ) {
-- if( bt_deletepage (mgr, set, thread_no) )
-- return mgr->err;
-- } else {
-- set->latch->dirty = 1;
-- bt_unlockpage(BtLockWrite, set->latch);
-- bt_unpinlatch (mgr, set->latch);
-- }
++ if( !set->page->act )
++ return bt_deletepage (mgr, set, thread_no, 1);
++ set->latch->dirty = 1;
++ bt_unlockpage(BtLockWrite, set->latch);
++ bt_unpinlatch (mgr, set->latch);
return 0;
}
// split already locked full node
// leave it locked.
// return pool entry for new right
--// page, unlocked
++// page, pinned & unlocked
uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
{
}
// fix keys for newly split page
--// call with page locked, return
--// unlocked
++// call with both pages pinned & locked
++// return unlocked and unpinned
BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
{
return 0;
}
++ // split page
++
if( entry = bt_splitpage (mgr, set, thread_no) )
latch = mgr->latchsets + entry;
else
return mgr->err;
// splice right page into split chain
-- // and WriteLock it.
++ // and WriteLock it
bt_lockpage(BtLockWrite, latch, thread_no);
latch->split = set->latch->split;
return 0;
}
--// delete an empty master page for a transaction
--
--// note that the far right page never empties because
--// it always contains (at least) the infinite stopper key
--// and that all pages that don't contain any keys are
--// deleted, or are being held under Atomic lock.
--
--BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
++int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
{
--BtPageSet right[1], temp[1];
--unsigned char value[BtId];
--uid right_page_no;
--BtKey *ptr;
--
-- bt_lockpage(BtLockWrite, prev->latch, thread_no);
--
-- // grab the right sibling
--
-- if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
-- right->page = bt_mappage (mgr, right->latch);
-- else
-- return mgr->err;
--
-- bt_lockpage(BtLockAtomic, right->latch, thread_no);
-- bt_lockpage(BtLockWrite, right->latch, thread_no);
--
-- // and pull contents over empty page
-- // while preserving master's left link
--
-- memcpy (right->page->left, prev->page->left, BtId);
-- memcpy (prev->page, right->page, mgr->page_size);
--
-- // forward seekers to old right sibling
-- // to new page location in set
--
-- bt_putid (right->page->right, prev->latch->page_no);
-- right->latch->dirty = 1;
-- right->page->kill = 1;
--
-- // remove pointer to right page for searchers by
-- // changing right fence key to point to the master page
--
-- ptr = keyptr(right->page,right->page->cnt);
-- bt_putid (value, prev->latch->page_no);
--
-- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
-- return mgr->err;
--
-- // now that master page is in good shape we can
-- // remove its locks.
--
-- bt_unlockpage (BtLockAtomic, prev->latch);
-- bt_unlockpage (BtLockWrite, prev->latch);
--
-- // fix master's right sibling's left pointer
-- // to remove scanner's poiner to the right page
--
-- if( right_page_no = bt_getid (prev->page->right) ) {
-- if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
-- temp->page = bt_mappage (mgr, temp->latch);
-- else
-- return mgr->err;
--
-- bt_lockpage (BtLockWrite, temp->latch, thread_no);
-- bt_putid (temp->page->left, prev->latch->page_no);
-- temp->latch->dirty = 1;
--
-- bt_unlockpage (BtLockWrite, temp->latch);
-- bt_unpinlatch (mgr, temp->latch);
-- } else { // master is now the far right page
-- bt_mutexlock (mgr->lock);
-- bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
-- bt_releasemutex(mgr->lock);
-- }
++BtKey *key1 = (BtKey *)((char *)page + slot1->off);
++BtKey *key2 = (BtKey *)((char *)page + slot2->off);
-- // now that there are no pointers to the right page
-- // we can delete it after the last read access occurs
--
-- bt_unlockpage (BtLockWrite, right->latch);
-- bt_unlockpage (BtLockAtomic, right->latch);
-- bt_lockpage (BtLockDelete, right->latch, thread_no);
-- bt_lockpage (BtLockWrite, right->latch, thread_no);
-- bt_freepage (mgr, right);
-- return 0;
++ return keycmp (key1, key2->key, key2->len);
}
--
// atomic modification of a batch of keys.
--// return -1 if BTERR is set
--// otherwise return slot number
--// causing the key constraint violation
--// or zero on successful completion.
--
BTERR bt_atomictxn (BtDb *bt, BtPage source)
{
uint src, idx, slot, samepage, entry, que = 0;
// stable sort the list of keys into order to
// prevent deadlocks between threads.
--
++/*
for( src = 1; src++ < source->cnt; ) {
*temp = *slotptr(source,src);
key = keyptr (source,src);
break;
}
}
--
++*/
++ qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
// add entries to redo log
if( bt->mgr->pagezero->redopages )
BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
{
uint src, idx, slot, samepage, entry, que = 0;
--AtomicKey *head, *tail, *leaf;
BtPageSet set[1], prev[1];
unsigned char value[BtId];
BtLatchSet *latch;
locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
-- head = NULL;
-- tail = NULL;
--
// Load the leaf page for each key
// group same page references with reuse bit
// and determine any constraint violations
if( samepage = src > 1 )
if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
slot = bt_findslot(set->page, key->key, key->len);
-- else
-- bt_unlockpage(BtLockRead, set->latch);
if( !slot )
-- if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) )
++ if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) )
set->latch->split = 0;
else
return mgr->err;
locks[src].reqlsn = set->page->lsn;
}
-- // unlock last loadpage lock
--
-- if( source->cnt )
-- bt_unlockpage(BtLockRead, set->latch);
--
// obtain write lock for each master page
-- // sync flushed pages to disk
for( src = 0; src++ < source->cnt; ) {
if( locks[src].reuse )
// insert or delete each key
// process any splits or merges
-- // release Write & Atomic latches
-- // set ParentModifications and build
-- // queue of keys to insert for split pages
-- // or delete for deleted pages.
--
// run through txn list backwards
samepage = source->cnt + 1;
samepage = src;
// pick-up all splits from master page
-- // each one is already WriteLocked.
--
-- entry = prev->latch->split;
++ // each one is already pinned & WriteLocked.
-- while( entry ) {
++ if( entry = prev->latch->split ) do {
set->latch = mgr->latchsets + entry;
set->page = bt_mappage (mgr, set->latch);
-- entry = set->latch->split;
// delete empty master page by undoing its split
// (this is potentially another empty page)
-- // note that there are no new left pointers yet
++ // note that there are no pointers to it yet
if( !prev->page->act ) {
memcpy (set->page->left, prev->page->left, BtId);
memcpy (prev->page, set->page, mgr->page_size);
bt_lockpage (BtLockDelete, set->latch, thread_no);
-- bt_freepage (mgr, set);
--
prev->latch->split = set->latch->split;
prev->latch->dirty = 1;
++ bt_freepage (mgr, set);
continue;
}
-- // remove empty page from the split chain
-- // and return it to the free list.
++ // remove empty split page from the split chain
++ // and return it to the free list. No other
++ // thread has its page number yet.
if( !set->page->act ) {
memcpy (prev->page->right, set->page->right, BtId);
prev->latch->split = set->latch->split;
++
bt_lockpage (BtLockDelete, set->latch, thread_no);
bt_freepage (mgr, set);
continue;
}
-- // schedule prev fence key update
++ // update prev's fence key
ptr = keyptr(prev->page,prev->page->cnt);
-- leaf = calloc (sizeof(AtomicKey), 1), que++;
--
-- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-- leaf->entry = prev->latch - mgr->latchsets;
-- leaf->page_no = prev->latch->page_no;
-- leaf->type = 0;
--
-- if( tail )
-- tail->next = leaf;
-- else
-- head = leaf;
++ bt_putid (value, prev->latch->page_no);
-- tail = leaf;
++ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
++ return mgr->err;
// splice in the left link into the split page
bt_putid (set->page->left, prev->latch->page_no);
-- bt_lockpage(BtLockParent, prev->latch, thread_no);
-- bt_unlockpage(BtLockWrite, prev->latch);
++
if( lsm )
bt_syncpage (mgr, prev->page, prev->latch);
++
++ // page is unpinned below to avoid bt_txnpromote
++
++ bt_unlockpage(BtLockWrite, prev->latch);
*prev = *set;
-- }
++ } while( entry = prev->latch->split );
// update left pointer in next right page from last split page
-- // (if all splits were reversed, latch->split == 0)
++ // (if all splits were reversed or none occurred, latch->split == 0)
if( latch->split ) {
// fix left pointer in master's original (now split)
bt_lockpage (BtLockWrite, set->latch, thread_no);
bt_putid (set->page->left, prev->latch->page_no);
set->latch->dirty = 1;
++
bt_unlockpage (BtLockWrite, set->latch);
bt_unpinlatch (mgr, set->latch);
} else { // prev is rightmost page
}
// Process last page split in chain
++ // by switching the key from the master
++ // page to the last split.
ptr = keyptr(prev->page,prev->page->cnt);
-- leaf = calloc (sizeof(AtomicKey), 1), que++;
--
-- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-- leaf->entry = prev->latch - mgr->latchsets;
-- leaf->page_no = prev->latch->page_no;
-- leaf->type = 0;
--
-- if( tail )
-- tail->next = leaf;
-- else
-- head = leaf;
++ bt_putid (value, prev->latch->page_no);
-- tail = leaf;
++ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
++ return mgr->err;
-- bt_lockpage(BtLockParent, prev->latch, thread_no);
bt_unlockpage(BtLockWrite, prev->latch);
if( lsm )
bt_syncpage (mgr, prev->page, prev->latch);
-- // remove atomic lock on master page
--
bt_unlockpage(BtLockAtomic, latch);
++ bt_unpinlatch(mgr, latch);
++
++ // go through the list of splits and
++ // release the latch pins
++
++ while( entry = latch->split ) {
++ latch = mgr->latchsets + entry;
++ bt_unpinlatch(mgr, latch);
++ }
++
continue;
}
-- // finished if prev page occupied (either master or final split)
++ // since there are no splits, we're
++ // finished if master page occupied
if( prev->page->act ) {
-- bt_unlockpage(BtLockWrite, latch);
-- bt_unlockpage(BtLockAtomic, latch);
++ bt_unlockpage(BtLockAtomic, prev->latch);
++ bt_unlockpage(BtLockWrite, prev->latch);
if( lsm )
-- bt_syncpage (mgr, prev->page, latch);
++ bt_syncpage (mgr, prev->page, prev->latch);
-- bt_unpinlatch(mgr, latch);
++ bt_unpinlatch(mgr, prev->latch);
continue;
}
// any and all splits were reversed, and the
// master page located in prev is empty, delete it
-- // by pulling over master's right sibling.
--
-- // Remove empty master's fence key
--
-- ptr = keyptr(prev->page,prev->page->cnt);
--
-- if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
-- return mgr->err;
--
-- // perform the remainder of the delete
-- // from the FIFO queue
--
-- leaf = calloc (sizeof(AtomicKey), 1), que++;
--
-- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-- leaf->entry = prev->latch - mgr->latchsets;
-- leaf->page_no = prev->latch->page_no;
-- leaf->nounlock = 1;
-- leaf->type = 2;
--
-- if( tail )
-- tail->next = leaf;
-- else
-- head = leaf;
--
-- tail = leaf;
--
-- // leave atomic lock in place until
-- // deletion completes in next phase.
--
-- bt_unlockpage(BtLockWrite, prev->latch);
--
-- if( lsm )
-- bt_syncpage (mgr, prev->page, prev->latch);
++ if( bt_deletepage (mgr, prev, thread_no, 1) )
++ return mgr->err;
}
-- // add & delete keys for any pages split or merged during transaction
--
-- if( leaf = head )
-- do {
-- set->latch = mgr->latchsets + leaf->entry;
-- set->page = bt_mappage (mgr, set->latch);
--
-- bt_putid (value, leaf->page_no);
-- ptr = (BtKey *)leaf->leafkey;
--
-- switch( leaf->type ) {
-- case 0: // insert key
-- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
-- return mgr->err;
--
-- break;
--
-- case 1: // delete key
-- if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
-- return mgr->err;
--
-- break;
--
-- case 2: // free page
-- if( bt_atomicfree (mgr, set, thread_no) )
-- return mgr->err;
--
-- break;
-- }
--
-- if( !leaf->nounlock )
-- bt_unlockpage (BtLockParent, set->latch);
--
-- bt_unpinlatch (mgr, set->latch);
-- tail = leaf->next;
-- free (leaf);
-- } while( leaf = tail );
--
free (locks);
return 0;
}
continue;
set->latch = bt->mgr->latchsets + entry;
-- idx = set->latch->page_no % bt->mgr->latchhash;
if( !bt_mutextry(set->latch->modify) )
continue;
continue;
}
-- bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
-- bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
--
-- // entry interiour node or being or was killed
++ // entry interiour node or being killed or promoted
if( set->page->lvl || set->page->free || set->page->kill ) {
bt_releasemutex(set->latch->modify);
-- bt_unlockpage(BtLockAtomic, set->latch);
-- bt_unlockpage(BtLockWrite, set->latch);
continue;
}
// pin the page for our useage
set->latch->pin++;
++ set->latch->promote = 1;
bt_releasemutex(set->latch->modify);
++ bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
++
++ // remove the key for the page
++ // and wait for other threads to fade away
++
++ ptr = keyptr(set->page, set->page->cnt);
++
++ if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
++ return bt->mgr->err;
++
++ bt_unlockpage (BtLockWrite, set->latch);
++while( (set->latch->pin & ~CLOCK_bit) > 1 )
++sched_yield();
++ bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
++ bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
++ bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
++
// transfer slots in our selected page to larger btree
if( !(set->latch->page_no % 100) )
fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
-- if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) )
++ if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
return bt->main->err;
-- bt_unlockpage(BtLockAtomic, set->latch);
--
// now delete the page
-- if( bt_deletepage (bt->mgr, set, bt->thread_no) )
-- return bt->mgr->err;
--
-- return 0;
++ bt_unlockpage (BtLockDelete, set->latch);
++ bt_unlockpage (BtLockAtomic, set->latch);
++ return bt_deletepage (bt->mgr, set, bt->thread_no, 0);
}
}