// start at root of btree and drill down
do {
- // determine lock mode of drill level
- mode = (drill == lvl) ? lock : BtLockRead;
-
if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
set->page = bt_mappage (mgr, set->latch);
else
return 0;
- // obtain access lock using lock chaining with Access mode
-
if( page_no > ROOT_page )
bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
}
// obtain mode lock using lock coupling through AccessLock
+ // determine lock mode of drill level
+ mode = (drill == lvl) ? lock : BtLockRead;
bt_lockpage(mode, set->latch, thread_no, __LINE__);
// grab our fence key
{
unsigned char *freechain;
+ // lock allocation page
+
+ bt_mutexlock (mgr->lock);
+
if( set->page->lvl ) {
freechain = mgr->pagezero->freechain;
mgr->pagezero->upperpages--;
mgr->pagezero->leafpages--;
}
- // lock allocation page
-
- bt_mutexlock (mgr->lock);
-
// store chain
memcpy(set->page->right, freechain, BtId);
if( !lvl )
page_size <<= mgr->leaf_xtra;
- // cache copy of original fence key
- // that is being deleted.
+ // cache original copy of original fence key
+ // that is going to be deleted.
ptr = keyptr(set->page, set->page->cnt);
memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
- // obtain locks on right page
+ // pin and lock our right page
page_no = bt_getid(set->page->right);
if( right->page->kill )
return mgr->line = __LINE__, mgr->err = BTERR_struct;
- // pull contents of right peer into our empty page
+ // pull contents of right sibling over our empty page
// preserving our left page number, and its right page number.
bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
page_no = bt_getid(set->page->left);
memcpy (set->page, right->page, page_size);
bt_putid (set->page->left, page_no);
- bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
-
set->page->page_no = set->latch->page_no;
+ bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
// fix left link from far right page
bt_releasemutex(mgr->lock);
}
- // mark right page deleted and release lock
+ // mark right page as being deleted and release lock
right->page->kill = 1;
bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
- // redirect higher key directly to our new node contents
+ // redirect the new higher key directly to our new node
- ptr = keyptr(right->page, right->page->cnt);
+ ptr = keyptr(set->page, set->page->cnt);
bt_putid (value, set->latch->page_no);
if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
return mgr->err;
- // delete our orignal fence key in parent
- // and unlock our page.
+ // delete our original fence key in parent
ptr = (BtKey *)lowerfence;
if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
return mgr->err;
+ // release write lock to our node
+
bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
- // obtain delete and write locks to right node
+ // wait for all access to drain away with delete lock,
+ // then obtain write lock to right node and free it.
bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
page->act = 0;
// clean up page first by
- // removing deleted keys
+ // removing dead keys
while( cnt++ < max ) {
if( cnt == slot )
slotptr(page, idx)->type = slotptr(frame, cnt)->type;
if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
- page->act++;
+ page->act++;
}
page->cnt = idx;
slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
- frame->act++;
+ frame->act++;
}
frame->cnt = idx;
BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
{
BtKey *key = keyptr(source, src);
+uint idx, slot, entry;
+BtLatchSet *latch;
BtPageSet set[1];
-uint idx, slot;
BtSlot *node;
BtKey *ptr;
BtVal *val;
- if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
- node = slotptr(set->page, slot);
- ptr = keyptr(set->page, slot);
- val = valptr(set->page, slot);
- } else
- return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
+ while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
+ node = slotptr(set->page, slot);
+ ptr = keyptr(set->page, slot);
+ val = valptr(set->page, slot);
- // if slot is not found, insert a delete slot
+ // if slot is not found on cache btree, insert a delete slot
+ // otherwise ignore the request.
if( keycmp (ptr, key->key, key->len) )
- if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
- return mgr->err;
+ if( !mgr->type )
+ if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
+ return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
+ else { // split page before inserting Delete slot
+ if( entry = bt_splitpage (mgr, set, thread_no, 0) )
+ latch = mgr->leafsets + entry;
+ else
+ return mgr->err;
+
+ // splice right page into split chain
+ // and WriteLock it
+
+ bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
+ latch->split = set->latch->split;
+ set->latch->split = entry;
+
+ // clear slot number for atomic page
+
+ locks[src].slot = 0;
+ continue;
+ }
+ else
+ return 0;
// if node is already dead,
// ignore the request.
- if( node->dead )
+ if( node->type == Delete || node->dead )
return 0;
- set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
- set->page->lsn = lsn;
- set->page->act--;
+ node->type = Delete;
+
+ // if main LSM btree, delete the slot
+
+ if( mgr->type ) {
+ set->page->act--;
+ node->dead = 1;
+ }
- node->dead = 0;
__sync_fetch_and_add(&mgr->found, 1);
+ set->page->lsn = lsn;
return 0;
+ }
+
+ return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
}
// release master's splits from right to left
continue;
}
- // entry interiour node or being killed or constructed
+ // entry is being killed or constructed
- if( set->page->lvl || set->page->nopromote || set->page->kill ) {
+ if( set->page->nopromote || set->page->kill ) {
bt_releasemutex(set->latch->modify);
continue;
}
return 0;
// main key is larger
+ // return smaller key
if( cmp < 0 ) {
bt->phase = 0;