From: unknown Date: Sun, 28 Sep 2014 20:32:36 +0000 (-0700) Subject: Fix a couple of bugs in atomicfree X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=75e9a39cab3db94c792cbb16d56ed223ef5523a3 Fix a couple of bugs in atomicfree --- diff --git a/threadskv8.c b/threadskv8.c index 91ec929..16ad09f 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -1395,6 +1395,7 @@ void bt_freepage (BtDb *bt, BtPageSet *set) bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); + bt_unpinlatch (set->latch); // unlock allocation page @@ -1476,7 +1477,6 @@ uint idx; root->latch->dirty = 1; bt_freepage (bt, child); - bt_unpinlatch (child->latch); } while( root->page->lvl > 1 && root->page->act == 1 ); @@ -1563,7 +1563,6 @@ BtKey *ptr; bt_lockpage (BtLockDelete, right->latch); bt_lockpage (BtLockWrite, right->latch); bt_freepage (bt, right); - bt_unpinlatch (right->latch); bt_unlockpage (BtLockParent, set->latch); bt_unpinlatch (set->latch); @@ -2305,9 +2304,6 @@ uint slot; else return 0; - if( set->page->free || set->page->lvl ) - return bt->err = BTERR_struct, 0; - // obtain read lock using lock chaining with Access mode // release & unpin parent/left sibling page @@ -2325,12 +2321,13 @@ uint slot; if( !set->page->kill ) if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { bt_unlockpage(BtLockRead, set->latch); - bt_lockpage(BtLockAtomic, set->latch); bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(BtLockAtomic, set->latch); bt_lockpage(BtLockRead, set->latch); bt_unlockpage(BtLockAccess, set->latch); - if( slot = bt_findslot (set->page, key, len) ) + if( !set->page->kill ) + if( slot = bt_findslot (set->page, key, len) ) return slot; bt_unlockpage(BtLockAtomic, set->latch); @@ -2437,16 +2434,88 @@ BtVal *val; return 0; } -uint bt_parentmatch (AtomicKey *head, uint entry) +// delete an empty master page for a transaction + +// note that the far right page never empties because +// it always contains (at least) the infinite stopper key +// and that all pages that don't contain any keys are +// deleted, or are being held under Atomic lock. + +BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev) { -uint cnt = 0; +BtPageSet right[1], temp[1]; +unsigned char value[BtId]; +uid right_page_no; +BtKey *ptr; + + bt_lockpage(BtLockWrite, prev->latch); + + // grab the right sibling + + if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) + right->page = bt_mappage (bt, right->latch); + else + return bt->err; + + bt_lockpage(BtLockAtomic, right->latch); + bt_lockpage(BtLockWrite, right->latch); + + // and pull contents over empty page + // while preserving master's left link + + memcpy (right->page->left, prev->page->left, BtId); + memcpy (prev->page, right->page, bt->mgr->page_size); + + // forward seekers to old right sibling + // to new page location in set + + bt_putid (right->page->right, prev->latch->page_no); + right->latch->dirty = 1; + right->page->kill = 1; + + // remove pointer to right page for searchers by + // changing right fence key to point to the master page + + ptr = keyptr(right->page,right->page->cnt); + bt_putid (value, prev->latch->page_no); + + if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + return bt->err; + + // now that master page is in good shape we can + // remove its locks. + + bt_unlockpage (BtLockAtomic, prev->latch); + bt_unlockpage (BtLockWrite, prev->latch); + + // fix master's right sibling's left pointer + // to remove scanner's poiner to the right page + + if( right_page_no = bt_getid (prev->page->right) ) { + if( temp->latch = bt_pinlatch (bt, right_page_no, 1) ) + temp->page = bt_mappage (bt, temp->latch); + + bt_lockpage (BtLockWrite, temp->latch); + bt_putid (temp->page->left, prev->latch->page_no); + temp->latch->dirty = 1; + + bt_unlockpage (BtLockWrite, temp->latch); + bt_unpinlatch (temp->latch); + } else { // master is now the far right page + bt_spinwritelock (bt->mgr->lock); + bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); + } - if( head ) - do if( head->entry == entry ) - head->nounlock = 1, cnt++; - while( head = head->next ); + // now that there are no pointers to the right page + // we can delete it after the last read access occurs - return cnt; + bt_unlockpage (BtLockWrite, right->latch); + bt_unlockpage (BtLockAtomic, right->latch); + bt_lockpage (BtLockDelete, right->latch); + bt_lockpage (BtLockWrite, right->latch); + bt_freepage (bt, right); + return 0; } // atomic modification of a batch of keys. @@ -2613,6 +2682,7 @@ int type; samepage = src; // pick-up all splits from master page + // each one is already WriteLocked. entry = prev->latch->split; @@ -2621,14 +2691,15 @@ int type; set->page = bt_mappage (bt, set->latch); entry = set->latch->split; - // delete empty master page + // delete empty master page by undoing its split + // (this is potentially another empty page) + // note that there are no new left pointers yet if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, bt->mgr->page_size); bt_lockpage (BtLockDelete, set->latch); bt_freepage (bt, set); - bt_unpinlatch (set->latch); prev->latch->dirty = 1; continue; @@ -2641,7 +2712,6 @@ int type; prev->latch->split = set->latch->split; bt_lockpage (BtLockDelete, set->latch); bt_freepage (bt, set); - bt_unpinlatch (set->latch); continue; } @@ -2662,6 +2732,8 @@ int type; tail = leaf; + // splice in the left link into the split page + bt_putid (set->page->left, prev->latch->page_no); bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); @@ -2672,11 +2744,11 @@ int type; // (if all splits were reversed, latch->split == 0) if( latch->split ) { - // fix left pointer in master's original (now split) right sibling - // or set rightmost page in page zero + // fix left pointer in master's original (now split) + // far right sibling or set rightmost page in page zero if( right = bt_getid (prev->page->right) ) { - if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) ) + if( set->latch = bt_pinlatch (bt, right, 1) ) set->page = bt_mappage (bt, set->latch); else return -1; @@ -2731,71 +2803,24 @@ int type; // master page located in prev is empty, delete it // by pulling over master's right sibling. - // Delete empty master's fence key + // Remove empty master's fence key ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1); - - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; - leaf->type = 1; - - if( tail ) - tail->next = leaf; - else - head = leaf; - - tail = leaf; - - bt_lockpage(BtLockParent, prev->latch); - bt_unlockpage(BtLockWrite, prev->latch); - // grab master's right sibling - // note that the far right page never empties because - // it always contains (at least) the infinite stopper key - // and that all pages that don't contain any keys have - // been deleted, or are being deleted under write lock. - - if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) - set->page = bt_mappage (bt, set->latch); - else + if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) return -1; - bt_lockpage(BtLockWrite, set->latch); - - // and pull contents over empty page - // while preserving master's left link - - memcpy (set->page->left, prev->page->left, BtId); - memcpy (prev->page, set->page, bt->mgr->page_size); - - // forward seekers to old right sibling - // to new page location in prev + // perform the remainder of the delete + // from the FIFO queue - bt_putid (set->page->right, prev->latch->page_no); - set->latch->dirty = 1; - set->page->kill = 1; - - // add new fence key for new master page contents, delete - // right sibling after parent posts the new master fence. - - ptr = keyptr(set->page,set->page->cnt); leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; - leaf->entry = set->latch->entry; + leaf->entry = prev->latch->entry; + leaf->nounlock = 1; leaf->type = 2; - // see if right sibling key update is in the FIFO, - // and ParentLock if not there. - - if( !bt_parentmatch (head, leaf->entry) ) - bt_lockpage(BtLockParent, set->latch); - - bt_unlockpage(BtLockWrite, set->latch); - if( tail ) tail->next = leaf; else @@ -2803,25 +2828,10 @@ int type; tail = leaf; - // fix new master's right sibling's left pointer - - if( right = bt_getid (set->page->right) ) { - if( set->latch = bt_pinlatch (bt, right, 1) ) - set->page = bt_mappage (bt, set->latch); - - bt_lockpage (BtLockWrite, set->latch); - bt_putid (set->page->left, prev->latch->page_no); - set->latch->dirty = 1; + // leave atomic lock in place until + // deletion completes in next phase. - bt_unlockpage (BtLockWrite, set->latch); - bt_unpinlatch (set->latch); - } else { // master is now the far right page - bt_spinwritelock (bt->mgr->lock); - bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); - bt_spinreleasewrite(bt->mgr->lock); - } - - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(BtLockWrite, prev->latch); } // add & delete keys for any pages split or merged during transaction @@ -2847,13 +2857,10 @@ int type; break; - case 2: // insert key & free - if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + case 2: // free page + if( bt_atomicfree (bt, set) ) return -1; - bt_lockpage (BtLockDelete, set->latch); - bt_lockpage (BtLockWrite, set->latch); - bt_freepage (bt, set); break; }