-// foster btree version f2
-// 18 JAN 2014
+// foster btree version f
+// 29 JAN 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
// the tod field from the key.
// Keys are marked dead, but remain on the page until
-// it cleanup is called. The fence key (highest key) for
+// cleanup is called. The fence key (highest key) for
// the page is always present, even after cleanup.
typedef struct {
uint mode; // read-write mode
#ifdef unix
int idx;
- char *pooladvise; // bit maps for pool page advisements
#else
HANDLE idx;
#endif
BtLatchSet *set; // current page latch set
BtPool *pool; // current page pool
unsigned char *mem; // frame, cursor, page memory buffer
+ int foster; // last search was to foster child
int found; // last delete was found
int err; // last error
} BtDb;
extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
extern uint bt_nextkey (BtDb *bt, uint slot);
+// internal functions
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no);
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot);
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl);
+
// manager functions
extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
void bt_mgrclose (BtMgr *mgr);
free (mgr->pool);
free (mgr->hash);
free (mgr->latch);
- free (mgr->pooladvise);
free (mgr);
#else
FlushFileBuffers(mgr->idx);
mgr->pool = calloc (poolmax, sizeof(BtPool));
mgr->hash = calloc (hashsize, sizeof(ushort));
mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
- mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
#else
mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
if( pool->map == MAP_FAILED )
return bt->err = BTERR_map;
- // clear out madvise issued bits
- memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
#else
flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
BtPage page;
page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
- {
- uint idx = subpage / 8;
- uint bit = subpage % 8;
-
- if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
- madvise (page, bt->mgr->page_size, MADV_WILLNEED);
- (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
- }
- }
-#endif
return page;
}
int bt_findslot (BtDb *bt, unsigned char *key, uint len)
{
uint diff, higher = bt->page->cnt, low = 1, slot;
+uint good = 0;
- // low is the lowest candidate, higher is already
- // tested as .ge. the given key, loop ends when they meet
+ // if no right link
+ // make stopper key an infinite fence value
+ // by setting the good flag
+
+ if( bt_getid (bt->page->right) )
+ higher++;
+ else
+ good++;
+
+ // low is the next candidate.
+ // loop ends when they meet
+
+ // if good, higher is already
+ // tested as .ge. the given key.
while( diff = higher - low ) {
slot = low + ( diff >> 1 );
if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
low = slot + 1;
else
- higher = slot;
+ higher = slot, good++;
}
- return higher;
+ // return zero if key is on right link page
+
+ return good ? higher : 0;
}
// find and load page at given level for given key
// leave page rd or wr locked as requested
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
+uint bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
{
uid page_no = ROOT_page, prevpage = 0;
BtLatchSet *set, *prevset;
uint drill = 0xff, slot;
uint mode, prevmode;
BtPool *prevpool;
+int foster = 0;
// start at root of btree and drill down
}
}
- prevpage = bt->page_no;
- prevpool = bt->pool;
- prevset = bt->set;
- prevmode = mode;
-
// find key on page at this level
// and either descend to requested level
// or return key slot
- slot = bt_findslot (bt, key, len);
+ if( slot = bt_findslot (bt, key, len) ) {
+ // is this slot < foster child area
+ // on the requested level?
- // is this slot < foster child area
- // on the requested level?
+ // if so, return actual slot even if dead
- // if so, return actual slot even if dead
+ if( slot <= bt->page->cnt - bt->page->foster )
+ if( drill == lvl )
+ return bt->foster = foster, slot;
- if( slot <= bt->page->cnt - bt->page->foster )
- if( drill == lvl )
- return slot;
+ // find next active slot
+ // note: foster children are never dead
- // find next active slot
+ while( slotptr(bt->page, slot)->dead )
+ if( slot++ < bt->page->cnt )
+ continue;
+ else {
+ // we are waiting for fence key posting
+ page_no = bt_getid(bt->page->right);
+ goto slideright;
+ }
- // note: foster children are never dead
- // nor fence keys for interiour nodes
+ // is this slot < foster child area
+ // if so, drill to next level
- while( slotptr(bt->page, slot)->dead )
- if( slot++ < bt->page->cnt )
- continue;
- else
- return bt->err = BTERR_struct, 0; // last key shouldn't be deleted
+ if( slot <= bt->page->cnt - bt->page->foster )
+ foster = 0, drill--;
+ else
+ foster = 1;
+
+ // continue right onto foster child
+ // or down to next level.
- // is this slot < foster child area
- // if so, drill to next level
+ page_no = bt_getid(slotptr(bt->page, slot)->id);
- if( slot <= bt->page->cnt - bt->page->foster )
- drill--;
+ // or slide right into next page
- // continue right onto foster child
- // or down to next level.
+ } else {
+ page_no = bt_getid(bt->page->right);
+ foster = 1;
+ }
- page_no = bt_getid(slotptr(bt->page, slot)->id);
+slideright:
+ prevpage = bt->page_no;
+ prevpool = bt->pool;
+ prevset = bt->set;
+ prevmode = mode;
} while( page_no );
return 0; // return error
}
-// find and delete key on page by marking delete flag bit
-// when leaf page becomes empty, delete it from the btree
+// remove empty page from the B-tree
+// by pulling our right node left over ourselves
-BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+// call with bt->page, etc, set to page's locked parent
+// returns with page locked.
+
+BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot)
{
-unsigned char leftkey[256];
-BtLatchSet *rset, *set;
-BtPool *pool, *rpool;
-BtPage rpage, page;
-uid page_no, right;
-uint slot, tod;
+BtLatchSet *rset, *pset, *rpset;
+BtPool *rpool, *ppool, *rppool;
+BtPage rpage, ppage, rppage;
+uid right, parent, rparent;
BtKey ptr;
+uint idx;
- if( slot = bt_loadpage (bt, key, len, 0, BtLockWrite) )
- ptr = keyptr(bt->page, slot);
+ // cache node's parent page
+
+ parent = bt->page_no;
+ ppage = bt->page;
+ ppool = bt->pool;
+ pset = bt->set;
+
+ // lock and map our right page
+ // note that it cannot be our foster child
+ // since the our node is empty
+ // and it cannot be NULL because of the stopper
+ // in the last right page
+
+ bt_lockpage (BtLockWrite, set);
+
+ // if we aren't dead yet
+
+ if( page->act )
+ goto rmergexit;
+
+ if( right = bt_getid (page->right) )
+ if( rpool = bt_pinpool (bt, right) )
+ rpage = bt_page (bt, rpool, right);
+ else
+ return bt->err;
else
+ return bt->err = BTERR_struct;
+
+ rset = bt_pinlatch (bt, right);
+
+ // find our right neighbor
+
+ if( ppage->act > 1 ) {
+ for( idx = slot; idx++ < ppage->cnt; )
+ if( !slotptr(ppage, idx)->dead )
+ break;
+
+ if( idx > ppage->cnt )
+ return bt->err = BTERR_struct;
+
+ // redirect right neighbor in parent to left node
+
+ bt_putid(slotptr(ppage,idx)->id, page_no);
+ }
+
+ // if parent has only our deleted page, e.g. no right neighbor
+ // prepare to merge parent itself
+
+ if( ppage->act == 1 ) {
+ if( rparent = bt_getid (ppage->right) )
+ if( rppool = bt_pinpool (bt, rparent) )
+ rppage = bt_page (bt, rppool, rparent);
+ else
return bt->err;
+ else
+ return bt->err = BTERR_struct;
- // if key is found delete it, otherwise ignore request
- // note that fence keys of interiour nodes are not deleted.
+ rpset = bt_pinlatch (bt, rparent);
+ bt_lockpage (BtLockWrite, rpset);
- if( bt->found = !keycmp (ptr, key, len) )
- if( bt->found = slotptr(bt->page, slot)->dead == 0 ) {
- slotptr(bt->page,slot)->dead = 1;
- if( slot < bt->page->cnt )
- bt->page->dirty = 1;
- bt->page->act--;
+ // find our right neighbor on right parent page
+
+ for( idx = 0; idx++ < rppage->cnt; )
+ if( !slotptr(rppage, idx)->dead ) {
+ bt_putid (slotptr(rppage, idx)->id, page_no);
+ break;
}
- page_no = bt->page_no;
- pool = bt->pool;
- page = bt->page;
- set = bt->set;
+ if( idx > rppage->cnt )
+ return bt->err = BTERR_struct;
+ }
- // return if page is not empty or not found
+ // now that there are no more pointers to our right node
+ // we can wait for delete lock on it
- if( page->act || !bt->found ) {
- bt_unlockpage(BtLockWrite, set);
- bt_unpinlatch (set);
- bt_unpinpool (pool);
- return bt->err;
+ bt_lockpage(BtLockDelete, rset);
+ bt_lockpage(BtLockWrite, rset);
+
+ // pull contents of right page into our empty page
+
+ memcpy (page, rpage, bt->mgr->page_size);
+
+ // ready to release right parent lock
+ // now that we have a new page in place
+
+ if( ppage->act == 1 ) {
+ bt_unlockpage (BtLockWrite, rpset);
+ bt_unpinlatch (rpset);
+ bt_unpinpool (rppool);
}
- // cache copy of fence key of empty node
+ // add killed right block to free chain
+ // lock latch mgr
- ptr = keyptr(page, page->cnt);
- memcpy(leftkey, ptr, ptr->len + 1);
+ bt_spinwritelock(bt->mgr->latchmgr->lock);
- // release write lock on empty node
- // obtain Parent lock
+ // store free chain in allocation page second right
- bt_unlockpage(BtLockWrite, set);
- bt_lockpage(BtLockParent, set);
+ bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+ bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
- // load and lock parent to see
- // if delete of empty node is OK
- // ie, not a fence key of parent
+ // unlock latch mgr and right page
- while( 1 ) {
- if( slot = bt_loadpage (bt, leftkey+1, *leftkey, 1, BtLockWrite) )
- ptr = keyptr(bt->page, slot);
- else
+ bt_unlockpage(BtLockDelete, rset);
+ bt_unlockpage(BtLockWrite, rset);
+ bt_unpinlatch (rset);
+ bt_unpinpool (rpool);
+
+ bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+
+ // delete our obsolete fence key from our parent
+
+ slotptr(ppage, slot)->dead = 1;
+ ppage->dirty = 1;
+
+ // if our parent now empty
+ // remove it from the tree
+
+ if( ppage->act-- == 1 )
+ if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) )
return bt->err;
- // does parent level contain our fence key yet?
- // and is it free of foster children?
+rmergexit:
+ bt_unlockpage (BtLockWrite, pset);
+ bt_unpinlatch (pset);
+ bt_unpinpool (ppool);
- if( !bt->page->foster )
- if( !keycmp (ptr, leftkey+1, *leftkey) )
- break;
+ bt->found = 1;
+ return bt->err = 0;
+}
+
+// remove empty page from the B-tree
+// try merging left first. If no left
+// sibling, then merge right.
+
+// call with page loaded and locked,
+// return with page locked.
+
+BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl)
+{
+unsigned char fencekey[256], postkey[256];
+uint slot, idx, postfence = 0;
+BtLatchSet *lset, *pset;
+BtPool *lpool, *ppool;
+BtPage lpage, ppage;
+uid left, parent;
+BtKey ptr;
+
+ ptr = keyptr(page, page->cnt);
+ memcpy(fencekey, ptr, ptr->len + 1);
+ bt_unlockpage (BtLockWrite, set);
+
+ // load and lock our parent
+
+retry:
+ if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) )
+ return bt->err;
- bt_unlockpage(BtLockWrite, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ parent = bt->page_no;
+ ppage = bt->page;
+ ppool = bt->pool;
+ pset = bt->set;
+
+ // wait until we are not a foster child
+
+ if( bt->foster ) {
+ bt_unlockpage (BtLockWrite, pset);
+ bt_unpinlatch (pset);
+ bt_unpinpool (ppool);
#ifdef unix
- sched_yield();
+ sched_yield();
#else
- SwitchToThread();
+ SwitchToThread();
#endif
+ goto retry;
}
- // find our left fence key
+ // find our left neighbor in our parent page
- while( slotptr(bt->page, slot)->dead )
- if( slot++ < bt->page->cnt )
- continue;
- else
- return bt->err = BTERR_struct; // last key shouldn't be deleted
+ for( idx = slot; --idx; )
+ if( !slotptr(ppage, idx)->dead )
+ break;
- // now we have both parent and child
+ // if no left neighbor, do right merge
- bt_lockpage(BtLockDelete, set);
- bt_lockpage(BtLockWrite, set);
+ if( !idx )
+ return bt_mergeright (bt, page, pool, set, page_no, lvl, slot);
- // return if page has no right sibling within parent
- // or if empty node is no longer empty
+ // lock and map our left neighbor's page
- if( page->act || slot == bt->page->cnt ) {
- // unpin parent
- bt_unlockpage(BtLockWrite, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
- // unpin empty node
- bt_unlockpage(BtLockParent, set);
- bt_unlockpage(BtLockDelete, set);
- bt_unlockpage(BtLockWrite, set);
- bt_unpinlatch (set);
- bt_unpinpool (pool);
+ left = bt_getid (slotptr(ppage, idx)->id);
+
+ if( lpool = bt_pinpool (bt, left) )
+ lpage = bt_page (bt, lpool, left);
+ else
return bt->err;
+
+ lset = bt_pinlatch (bt, left);
+ bt_lockpage(BtLockWrite, lset);
+
+ // wait until foster sibling is in our parent
+
+ if( bt_getid (lpage->right) != page_no ) {
+ bt_unlockpage (BtLockWrite, pset);
+ bt_unpinlatch (pset);
+ bt_unpinpool (ppool);
+ bt_unlockpage (BtLockWrite, lset);
+ bt_unpinlatch (lset);
+ bt_unpinpool (lpool);
+#ifdef linux
+ sched_yield();
+#else
+ SwitchToThread();
+#endif
+ goto retry;
}
- // lock and map our right page
- // note that it cannot be our foster child
- // since the our node is empty
+ // since our page will have no more pointers to it,
+ // obtain Delete lock and wait for write locks to clear
- right = bt_getid(page->right);
+ bt_lockpage(BtLockDelete, set);
+ bt_lockpage(BtLockWrite, set);
- if( rpool = bt_pinpool (bt, right) )
- rpage = bt_page (bt, rpool, right);
- else
- return bt->err;
+ // if we aren't dead yet,
+ // get ready for exit
- rset = bt_pinlatch (bt, right);
- bt_lockpage(BtLockWrite, rset);
- bt_lockpage(BtLockDelete, rset);
+ if( page->act ) {
+ bt_unlockpage(BtLockDelete, set);
+ bt_unlockpage(BtLockWrite, lset);
+ bt_unpinlatch (lset);
+ bt_unpinpool (lpool);
+ goto lmergexit;
+ }
- // pull contents of right page into empty page
+ // are we are the fence key for our parent?
+ // if so, grab our old fence key
- memcpy (page, rpage, bt->mgr->page_size);
+ if( postfence = slot == ppage->cnt ) {
+ ptr = keyptr (ppage, ppage->cnt);
+ memcpy(fencekey, ptr, ptr->len + 1);
+ memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
- // delete left parent slot for old empty page
- // and redirect right parent slot to it
+ // clear out other dead slots
- bt->page->act--;
- bt->page->dirty = 1;
- slotptr(bt->page, slot)->dead = 1;
+ while( --ppage->cnt )
+ if( slotptr(ppage, ppage->cnt)->dead )
+ memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot));
+ else
+ break;
- while( slot++ < bt->page->cnt )
- if( !slotptr(bt->page, slot)->dead )
- break;
+ ptr = keyptr (ppage, ppage->cnt);
+ memcpy(postkey, ptr, ptr->len + 1);
+ } else
+ slotptr(ppage,slot)->dead = 1;
- bt_putid(slotptr(bt->page,slot)->id, page_no);
+ ppage->dirty = 1;
+ ppage->act--;
- // release parent level lock
- // and our empty node lock
+ // push our right neighbor pointer to our left
- bt_unlockpage(BtLockWrite, set);
- bt_unlockpage(BtLockWrite, bt->set);
- bt_unpinlatch (bt->set);
- bt_unpinpool (bt->pool);
+ memcpy (lpage->right, page->right, BtId);
- // add killed right block to free chain
+ // add ourselves to free chain
// lock latch mgr
bt_spinwritelock(bt->mgr->latchmgr->lock);
// store free chain in allocation page second right
- bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
- bt_putid(bt->mgr->latchmgr->alloc[1].right, right);
+ bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
+ bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no);
- // unlock latch mgr and right page
+ // unlock latch mgr and pages
bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+ bt_unlockpage(BtLockWrite, lset);
+ bt_unpinlatch (lset);
+ bt_unpinpool (lpool);
- bt_unlockpage(BtLockWrite, rset);
- bt_unlockpage(BtLockDelete, rset);
- bt_unpinlatch (rset);
- bt_unpinpool (rpool);
+ // release our node's delete lock
- // remove ParentModify lock
-
- bt_unlockpage(BtLockParent, set);
bt_unlockpage(BtLockDelete, set);
+
+lmergexit:
+ bt_unlockpage (BtLockWrite, pset);
+ bt_unpinpool (ppool);
+
+ // do we need to post parent's fence key in its parent?
+
+ if( !postfence || parent == ROOT_page ) {
+ bt_unpinlatch (pset);
+ bt->found = 1;
+ return bt->err = 0;
+ }
+
+ // interlock parent fence post
+
+ bt_lockpage (BtLockParent, pset);
+
+ // load parent's parent page
+posttry:
+ if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) )
+ return bt->err;
+
+ if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) )
+ if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
+ return bt->err;
+ else
+ goto posttry;
+
+ page = bt->page;
+
+ page->min -= *postkey + 1;
+ ((unsigned char *)page)[page->min] = *postkey;
+ memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey );
+ slotptr(page, slot)->off = page->min;
+
+ bt_unlockpage (BtLockParent, pset);
+ bt_unpinlatch (pset);
+
+ bt_unlockpage (BtLockWrite, bt->set);
+ bt_unpinlatch (bt->set);
+ bt_unpinpool (bt->pool);
+
+ bt->found = 1;
+ return bt->err = 0;
+}
+
+// find and delete key on page by marking delete flag bit
+// if page becomes empty, delete it from the btree
+
+BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
+{
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+BtKey ptr;
+uint slot;
+
+ if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) )
+ return bt->err;
+
+ page_no = bt->page_no;
+ page = bt->page;
+ pool = bt->pool;
+ set = bt->set;
+
+ // if key is found delete it, otherwise ignore request
+
+ ptr = keyptr(page, slot);
+
+ if( bt->found = !keycmp (ptr, key, len) )
+ if( bt->found = slotptr(page, slot)->dead == 0 ) {
+ slotptr(page,slot)->dead = 1;
+ if( slot < page->cnt )
+ page->dirty = 1;
+ if( !--page->act )
+ if( bt_mergeleft (bt, page, pool, set, page_no, 0) )
+ return bt->err;
+ }
+
+ bt_unlockpage(BtLockWrite, set);
bt_unpinlatch (set);
bt_unpinpool (pool);
- return 0;
-}
+ return bt->err = 0;
+}
// find key in leaf level and return row-id
// 0 - page needs splitting
// >0 new slot value
-uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
+uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
{
uint nxt = bt->mgr->page_size;
-BtPage page = bt->page;
uint cnt = 0, idx = 0;
uint max = page->cnt;
-uint newslot;
+uint newslot = max;
BtKey key;
if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
// otherwise, remove deleted key
// note: foster children are never dead
- // nor are fence keys for interiour nodes
while( cnt++ < max ) {
if( cnt == slot )
newslot = idx + 1;
- else if( cnt < max && slotptr(bt->frame,cnt)->dead )
+ if( cnt < max && slotptr(bt->frame,cnt)->dead )
continue;
// copy key
// add key to current page
// page must already be writelocked
-void bt_addkeytopage (BtDb *bt, uint slot, unsigned char *key, uint len, uid id, uint tod)
+void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod)
{
-BtPage page = bt->page;
uint idx;
// find next available dead slot and copy key onto page
}
// split already locked full node
-// in current page variables
// return unlocked and unpinned.
-BTERR bt_splitpage (BtDb *bt)
+BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no)
{
uint slot, cnt, idx, max, nxt = bt->mgr->page_size;
unsigned char fencekey[256];
-uid page_no = bt->page_no;
-BtLatchSet *set = bt->set;
-BtPool *pool = bt->pool;
-BtPage page = bt->page;
uint tod = time(NULL);
uint lvl = page->lvl;
-uid new_page, right;
+uid new_page;
BtKey key;
// initialize frame buffer for right node
memset (bt->frame, 0, bt->mgr->page_size);
max = page->cnt - page->foster;
- tod = (uint)time(NULL);
cnt = max / 2;
idx = 0;
// transfer right link node to new right node
- if( page_no > ROOT_page ) {
- right = bt_getid (page->right);
- bt_putid(bt->frame->right, right);
- }
+ if( page_no > ROOT_page )
+ memcpy (bt->frame->right, page->right, BtId);
bt->frame->bits = bt->mgr->page_bits;
bt->frame->min = nxt;
page->cnt--;
page->act--;
+ bt_unlockpage (BtLockParent, set);
+
+ // if this emptied page,
+ // undo the foster child
+
+ if( !page->act )
+ if( bt_mergeleft (bt, page, pool, set, page_no, lvl) )
+ return bt->err;
+
// unlock and unpin
bt_unlockpage (BtLockWrite, set);
- bt_unlockpage (BtLockParent, set);
bt_unpinlatch (set);
bt_unpinpool (pool);
return 0;
// check if page has enough space
- if( slot = bt_cleanpage (bt, len, slot) )
+ if( slot = bt_cleanpage (bt, bt->page, len, slot) )
break;
- if( bt_splitpage (bt) )
+ if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) )
return bt->err;
}
- bt_addkeytopage (bt, slot, key, len, id, tod);
+ bt_addkeytopage (bt, bt->page, slot, key, len, id, tod);
bt_unlockpage (BtLockWrite, bt->set);
bt_unpinlatch (bt->set);
}
for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
- if( *(ushort *)bt->mgr->latchmgr->table[hashidx].latch )
+ if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
fprintf(stderr, "latchmgr locked\n");
if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
set = bt->mgr->latchsets + idx;
- if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
+ if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
fprintf(stderr, "latchset %d locked\n", idx);
if( set->hash != hashidx )
fprintf(stderr, "latchset %d wrong hashidx\n", idx);
bt_latchaudit (bt);
fprintf(stderr, "finished latch mgr audit\n");
break;
+
case 'w':
fprintf(stderr, "started indexing for %s\n", args->infile);
if( in = fopen (args->infile, "rb") )