bt_unlockpage (BtLockDelete, set->latch);
bt_unlockpage (BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
// unlock allocation page
root->latch->dirty = 1;
bt_freepage (bt, child);
- bt_unpinlatch (child->latch);
} while( root->page->lvl > 1 && root->page->act == 1 );
bt_lockpage (BtLockDelete, right->latch);
bt_lockpage (BtLockWrite, right->latch);
bt_freepage (bt, right);
- bt_unpinlatch (right->latch);
bt_unlockpage (BtLockParent, set->latch);
bt_unpinlatch (set->latch);
else
return 0;
- if( set->page->free || set->page->lvl )
- return bt->err = BTERR_struct, 0;
-
// obtain read lock using lock chaining with Access mode
// release & unpin parent/left sibling page
if( !set->page->kill )
if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
bt_unlockpage(BtLockRead, set->latch);
- bt_lockpage(BtLockAtomic, set->latch);
bt_lockpage(BtLockAccess, set->latch);
+ bt_lockpage(BtLockAtomic, set->latch);
bt_lockpage(BtLockRead, set->latch);
bt_unlockpage(BtLockAccess, set->latch);
- if( slot = bt_findslot (set->page, key, len) )
+ if( !set->page->kill )
+ if( slot = bt_findslot (set->page, key, len) )
return slot;
bt_unlockpage(BtLockAtomic, set->latch);
return 0;
}
-uint bt_parentmatch (AtomicKey *head, uint entry)
+// delete an empty master page for a transaction
+
+// note that the far right page never empties because
+// it always contains (at least) the infinite stopper key
+// and that all pages that don't contain any keys are
+// deleted, or are being held under Atomic lock.
+
+BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
{
-uint cnt = 0;
+BtPageSet right[1], temp[1];
+unsigned char value[BtId];
+uid right_page_no;
+BtKey *ptr;
+
+ bt_lockpage(BtLockWrite, prev->latch);
+
+ // grab the right sibling
+
+ if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+ right->page = bt_mappage (bt, right->latch);
+ else
+ return bt->err;
+
+ bt_lockpage(BtLockAtomic, right->latch);
+ bt_lockpage(BtLockWrite, right->latch);
+
+ // and pull contents over empty page
+ // while preserving master's left link
+
+ memcpy (right->page->left, prev->page->left, BtId);
+ memcpy (prev->page, right->page, bt->mgr->page_size);
+
+ // forward seekers to old right sibling
+ // to new page location in set
+
+ bt_putid (right->page->right, prev->latch->page_no);
+ right->latch->dirty = 1;
+ right->page->kill = 1;
+
+ // remove pointer to right page for searchers by
+ // changing right fence key to point to the master page
+
+ ptr = keyptr(right->page,right->page->cnt);
+ bt_putid (value, prev->latch->page_no);
+
+ if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+ return bt->err;
+
+ // now that master page is in good shape we can
+ // remove its locks.
+
+ bt_unlockpage (BtLockAtomic, prev->latch);
+ bt_unlockpage (BtLockWrite, prev->latch);
+
+ // fix master's right sibling's left pointer
+ // to remove scanner's poiner to the right page
+
+ if( right_page_no = bt_getid (prev->page->right) ) {
+ if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
+ temp->page = bt_mappage (bt, temp->latch);
+
+ bt_lockpage (BtLockWrite, temp->latch);
+ bt_putid (temp->page->left, prev->latch->page_no);
+ temp->latch->dirty = 1;
+
+ bt_unlockpage (BtLockWrite, temp->latch);
+ bt_unpinlatch (temp->latch);
+ } else { // master is now the far right page
+ bt_spinwritelock (bt->mgr->lock);
+ bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+ bt_spinreleasewrite(bt->mgr->lock);
+ }
- if( head )
- do if( head->entry == entry )
- head->nounlock = 1, cnt++;
- while( head = head->next );
+ // now that there are no pointers to the right page
+ // we can delete it after the last read access occurs
- return cnt;
+ bt_unlockpage (BtLockWrite, right->latch);
+ bt_unlockpage (BtLockAtomic, right->latch);
+ bt_lockpage (BtLockDelete, right->latch);
+ bt_lockpage (BtLockWrite, right->latch);
+ bt_freepage (bt, right);
+ return 0;
}
// atomic modification of a batch of keys.
samepage = src;
// pick-up all splits from master page
+ // each one is already WriteLocked.
entry = prev->latch->split;
set->page = bt_mappage (bt, set->latch);
entry = set->latch->split;
- // delete empty master page
+ // delete empty master page by undoing its split
+ // (this is potentially another empty page)
+ // note that there are no new left pointers yet
if( !prev->page->act ) {
memcpy (set->page->left, prev->page->left, BtId);
memcpy (prev->page, set->page, bt->mgr->page_size);
bt_lockpage (BtLockDelete, set->latch);
bt_freepage (bt, set);
- bt_unpinlatch (set->latch);
prev->latch->dirty = 1;
continue;
prev->latch->split = set->latch->split;
bt_lockpage (BtLockDelete, set->latch);
bt_freepage (bt, set);
- bt_unpinlatch (set->latch);
continue;
}
tail = leaf;
+ // splice in the left link into the split page
+
bt_putid (set->page->left, prev->latch->page_no);
bt_lockpage(BtLockParent, prev->latch);
bt_unlockpage(BtLockWrite, prev->latch);
// (if all splits were reversed, latch->split == 0)
if( latch->split ) {
- // fix left pointer in master's original (now split) right sibling
- // or set rightmost page in page zero
+ // fix left pointer in master's original (now split)
+ // far right sibling or set rightmost page in page zero
if( right = bt_getid (prev->page->right) ) {
- if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
+ if( set->latch = bt_pinlatch (bt, right, 1) )
set->page = bt_mappage (bt, set->latch);
else
return -1;
// master page located in prev is empty, delete it
// by pulling over master's right sibling.
- // Delete empty master's fence key
+ // Remove empty master's fence key
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = calloc (sizeof(AtomicKey), 1);
-
- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
- leaf->page_no = prev->latch->page_no;
- leaf->entry = prev->latch->entry;
- leaf->type = 1;
-
- if( tail )
- tail->next = leaf;
- else
- head = leaf;
-
- tail = leaf;
-
- bt_lockpage(BtLockParent, prev->latch);
- bt_unlockpage(BtLockWrite, prev->latch);
- // grab master's right sibling
- // note that the far right page never empties because
- // it always contains (at least) the infinite stopper key
- // and that all pages that don't contain any keys have
- // been deleted, or are being deleted under write lock.
-
- if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
- set->page = bt_mappage (bt, set->latch);
- else
+ if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
return -1;
- bt_lockpage(BtLockWrite, set->latch);
-
- // and pull contents over empty page
- // while preserving master's left link
-
- memcpy (set->page->left, prev->page->left, BtId);
- memcpy (prev->page, set->page, bt->mgr->page_size);
-
- // forward seekers to old right sibling
- // to new page location in prev
+ // perform the remainder of the delete
+ // from the FIFO queue
- bt_putid (set->page->right, prev->latch->page_no);
- set->latch->dirty = 1;
- set->page->kill = 1;
-
- // add new fence key for new master page contents, delete
- // right sibling after parent posts the new master fence.
-
- ptr = keyptr(set->page,set->page->cnt);
leaf = calloc (sizeof(AtomicKey), 1);
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
- leaf->entry = set->latch->entry;
+ leaf->entry = prev->latch->entry;
+ leaf->nounlock = 1;
leaf->type = 2;
- // see if right sibling key update is in the FIFO,
- // and ParentLock if not there.
-
- if( !bt_parentmatch (head, leaf->entry) )
- bt_lockpage(BtLockParent, set->latch);
-
- bt_unlockpage(BtLockWrite, set->latch);
-
if( tail )
tail->next = leaf;
else
tail = leaf;
- // fix new master's right sibling's left pointer
-
- if( right = bt_getid (set->page->right) ) {
- if( set->latch = bt_pinlatch (bt, right, 1) )
- set->page = bt_mappage (bt, set->latch);
-
- bt_lockpage (BtLockWrite, set->latch);
- bt_putid (set->page->left, prev->latch->page_no);
- set->latch->dirty = 1;
+ // leave atomic lock in place until
+ // deletion completes in next phase.
- bt_unlockpage (BtLockWrite, set->latch);
- bt_unpinlatch (set->latch);
- } else { // master is now the far right page
- bt_spinwritelock (bt->mgr->lock);
- bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
- bt_spinreleasewrite(bt->mgr->lock);
- }
-
- bt_unlockpage(BtLockAtomic, latch);
+ bt_unlockpage(BtLockWrite, prev->latch);
}
// add & delete keys for any pages split or merged during transaction
break;
- case 2: // insert key & free
- if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+ case 2: // free page
+ if( bt_atomicfree (bt, set) )
return -1;
- bt_lockpage (BtLockDelete, set->latch);
- bt_lockpage (BtLockWrite, set->latch);
- bt_freepage (bt, set);
break;
}