From 2710566cd35cf8b6bd25f2e8c17375d7f83c9cc8 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 18 Feb 2014 16:39:47 -0800 Subject: [PATCH] fix usage of BtLockParent --- btree2s.c | 194 ++++++++++++++++++++++++++---------------------------- 1 file changed, 93 insertions(+), 101 deletions(-) diff --git a/btree2s.c b/btree2s.c index af39f90..586dea7 100644 --- a/btree2s.c +++ b/btree2s.c @@ -1,5 +1,5 @@ // btree version 2s -// 13 FEB 2014 +// 18 FEB 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -1023,7 +1023,7 @@ uint good = 0; else good++; - // low is the next candidate, higher is already + // low is the lowest candidate, higher is already // tested as .ge. the given key, loop ends when they meet while( diff = higher - low ) { @@ -1088,7 +1088,7 @@ uint mode, prevmode; drill = bt->page->lvl; - if( lock == BtLockWrite && drill == lvl ) + if( lock != BtLockRead && drill == lvl ) if( bt_unlockpage(bt, page_no, mode) ) return 0; else @@ -1110,20 +1110,16 @@ uint mode, prevmode; if( slot++ < bt->page->cnt ) continue; else - break; - - // if the page has no active slots, - // move right otherwise drill down + goto slideright; - if( slot <= bt->page->cnt ) { - page_no = bt_getid(slotptr(bt->page, slot)->id); - drill--; - continue; - } + page_no = bt_getid(slotptr(bt->page, slot)->id); + drill--; + continue; } // or slide right into next page +slideright: page_no = bt_getid(bt->page->right); } while( page_no ); @@ -1135,30 +1131,31 @@ uint mode, prevmode; } // a fence key was deleted from a page -// push new fence value up to the root +// push new fence value upwards BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl) { unsigned char leftkey[256], rightkey[256]; BtKey ptr; -uid left; // remove deleted key, the old fence value - ptr = keyptr(bt->page, bt->page->cnt--); + ptr = keyptr(bt->page, bt->page->cnt); memcpy(rightkey, ptr, ptr->len + 1); - left = bt_getid (slotptr(bt->page, bt->page->cnt)->id); + memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot)); + bt->page->dirty = 1; + ptr = keyptr(bt->page, bt->page->cnt); memcpy(leftkey, ptr, ptr->len + 1); if( bt_update (bt, bt->page, page_no) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockWrite) ) + if( bt_lockpage (bt, page_no, BtLockParent) ) return bt->err; - if( bt_lockpage (bt, left, BtLockParent) ) + if( bt_unlockpage (bt, page_no, BtLockWrite) ) return bt->err; // insert new (now smaller) fence key @@ -1166,12 +1163,12 @@ uid left; if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) ) return bt->err; - if( bt_unlockpage (bt, left, BtLockParent) ) - return bt->err; - // remove old (larger) fence key - return bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1); + if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) ) + return bt->err; + + return bt_unlockpage (bt, page_no, BtLockParent); } // root has a single child @@ -1221,11 +1218,10 @@ uint idx; BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) { unsigned char lowerkey[256], higherkey[256]; -uint slot, dirty = 0, idx, fence; +uint slot, dirty = 0, idx, fence, found; uid page_no, right; BtKey ptr; -retry: if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) ptr = keyptr(bt->page, slot); else @@ -1237,11 +1233,10 @@ retry: // if key is found delete it, otherwise ignore request - if( !keycmp (ptr, key, len) ) - if( slotptr(bt->page, slot)->dead == 0 ) { + if( found = !keycmp (ptr, key, len) ) + if( found = slotptr(bt->page, slot)->dead == 0 ) { dirty = slotptr(bt->page,slot)->dead = 1; - if( slot < bt->page->cnt ) - bt->page->dirty = 1; + bt->page->dirty = 1; bt->page->act--; // collapse empty slots @@ -1257,27 +1252,31 @@ retry: right = bt_getid(bt->page->right); page_no = bt->page_no; - // did we delete our fence key in an upper level? + // did we delete a fence key in an upper level? if( dirty && lvl && bt->page->act && fence ) - return bt_fixfence (bt, page_no, lvl); + if( bt_fixfence (bt, page_no, lvl) ) + return bt->err; + else + return bt->found = found, 0; // is this a collapsed root? if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 ) - return bt_collapseroot (bt, bt->page); - - // return if page is not empty - - if( bt->page->act ) - if ( dirty && bt_update(bt, bt->page, page_no) ) + if( bt_collapseroot (bt, bt->page) ) return bt->err; else - return bt_unlockpage(bt, page_no, BtLockWrite); + return bt->found = found, 0; - // mark page being deleted + // return if page is not empty - bt->page->kill = 1; + if( bt->page->act ) { + if( dirty && bt_update(bt, bt->page, page_no) ) + return bt->err; + if( bt_unlockpage(bt, page_no, BtLockWrite) ) + return bt->err; + return bt->found = found, 0; + } // cache copy of fence key // in order to find parent @@ -1285,18 +1284,7 @@ retry: ptr = keyptr(bt->page, bt->page->cnt); memcpy(lowerkey, ptr, ptr->len + 1); - if( bt_update(bt, bt->page, page_no) ) - return bt->err; - - if( bt_unlockpage(bt, page_no, BtLockWrite) ) - return bt->err; - - // obtain Parent lock - - if( bt_lockpage(bt, page_no, BtLockParent) ) - return bt->err; - - // lock and map right page + // obtain lock on right page if ( bt_lockpage(bt, right, BtLockWrite) ) return bt->err; @@ -1304,27 +1292,8 @@ retry: if( bt_mappage (bt, &bt->temp, right) ) return bt->err; - // is our right sibling being deleted? - - if( bt->temp->kill ) { - if ( bt_unlockpage(bt, right, BtLockWrite) ) - return bt->err; - - if( bt_unlockpage(bt, page_no, BtLockParent) ) - return bt->err; -#ifdef linux - sched_yield (); -#else - SwitchToThread(); -#endif - goto retry; - } - - if( bt_lockpage(bt, page_no, BtLockWrite) ) - return bt->err; - - if( bt_mappage (bt, &bt->page, page_no) ) - return bt->err; + if( bt->temp->kill ) + return bt->err = BTERR_struct; // pull contents of next page into current empty page @@ -1347,15 +1316,21 @@ retry: if( bt_update(bt, bt->temp, right) ) return bt->err; - if( bt_unlockpage(bt, right, BtLockWrite) ) + if( bt_lockpage(bt, page_no, BtLockParent) ) return bt->err; if( bt_unlockpage(bt, page_no, BtLockWrite) ) return bt->err; + if( bt_lockpage(bt, right, BtLockParent) ) + return bt->err; + + if( bt_unlockpage(bt, right, BtLockWrite) ) + return bt->err; + // redirect higher key directly to consolidated node - if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, time(NULL)) ) + if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) ) return bt->err; // delete old lower key to consolidated node @@ -1363,21 +1338,22 @@ retry: if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) ) return bt->err; - // obtain delete lock on deleted node + // obtain write & delete lock on deleted node // add right block to free chain if( bt_lockpage(bt, right, BtLockDelete) ) return bt->err; - // obtain write lock on deleted node - if( bt_lockpage(bt, right, BtLockWrite) ) return bt->err; if( bt_freepage (bt, right) ) return bt->err; - // remove ParentModify lock + // remove ParentModify locks + + if( bt_unlockpage(bt, right, BtLockParent) ) + return bt->err; return bt_unlockpage(bt, page_no, BtLockParent); } @@ -1528,13 +1504,14 @@ uid page_no = bt->page_no, right; unsigned char fencekey[256]; BtPage page = bt->page; uint lvl = page->lvl; +uint prev; BtKey key; // split higher half of keys to bt->frame // the last key (fence key) might be dead memset (bt->frame, 0, bt->page_size); - max = (int)page->cnt; + max = page->cnt; cnt = max / 2; idx = 0; @@ -1588,7 +1565,7 @@ BtKey key; page->act++; } - // remember fence key for old page + // remember fence key for smaller page memcpy (fencekey, key, key->len + 1); bt_putid(page->right, right); @@ -1605,25 +1582,23 @@ BtKey key; if( bt_update(bt, page, page_no) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockWrite) ) - return bt->err; + right = 0; // insert new fences in parent pages - do { + while( 1 ) { if( bt_lockpage (bt, page_no, BtLockParent) ) return bt->err; - if( bt_lockpage (bt, page_no, BtLockWrite) ) - return bt->err; - - if( bt_mappage (bt, &bt->page, page_no) ) + if( right ) + if( bt_mappage (bt, &bt->page, page_no) ) return bt->err; key = keyptr(bt->page, bt->page->cnt); memcpy (fencekey, key, key->len + 1); + prev = bt->page->posted; - if( bt->page->posted ) { + if( right && prev ) { if( bt_unlockpage (bt, page_no, BtLockParent) ) return bt->err; return bt_unlockpage (bt, page_no, BtLockWrite); @@ -1640,12 +1615,19 @@ BtKey key; // insert new fence for reformulated left block - if( bt_insertkey (bt, fencekey+1, *fencekey, lvl + 1, page_no, time(NULL)) ) + if( !prev ) + if( bt_insertkey (bt, fencekey+1, *fencekey, lvl + 1, page_no, time(NULL)) ) return bt->err; if( bt_unlockpage (bt, page_no, BtLockParent) ) return bt->err; - } while( page_no = right ); + + if( !(page_no = right) ) + break; + + if( bt_lockpage (bt, page_no, BtLockWrite) ) + return bt->err; + } return 0; } @@ -1675,12 +1657,14 @@ BtKey ptr; page = bt->page; if( !keycmp (ptr, key, len) ) { - slotptr(page, slot)->dead = 0; - slotptr(page, slot)->tod = tod; - bt_putid(slotptr(page,slot)->id, id); - if ( bt_update(bt, bt->page, bt->page_no) ) - return bt->err; - return bt_unlockpage(bt, bt->page_no, BtLockWrite); + if( slotptr(page, slot)->dead ) + page->act++; + slotptr(page, slot)->dead = 0; + slotptr(page, slot)->tod = tod; + bt_putid(slotptr(page,slot)->id, id); + if ( bt_update(bt, bt->page, bt->page_no) ) + return bt->err; + return bt_unlockpage(bt, bt->page_no, BtLockWrite); } // check if page has enough space @@ -1718,7 +1702,7 @@ BtKey ptr; slotptr(page, slot)->tod = tod; slotptr(page, slot)->dead = 0; - if ( bt_update(bt, bt->page, bt->page_no) ) + if( bt_update(bt, bt->page, bt->page_no) ) return bt->err; return bt_unlockpage(bt, bt->page_no, BtLockWrite); @@ -1731,11 +1715,16 @@ uint bt_startkey (BtDb *bt, unsigned char *key, uint len) uint slot; // cache page for retrieval + if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) ) - memcpy (bt->cursor, bt->page, bt->page_size); + memcpy (bt->cursor, bt->page, bt->page_size); + else + return 0; + bt->cursor_page = bt->page_no; + if ( bt_unlockpage(bt, bt->page_no, BtLockRead) ) - return 0; + return 0; return slot; } @@ -1749,6 +1738,7 @@ off64_t right; do { right = bt_getid(bt->cursor->right); + while( slot++ < bt->cursor->cnt ) if( slotptr(bt->cursor,slot)->dead ) continue; @@ -1762,17 +1752,19 @@ off64_t right; bt->cursor_page = right; - if( bt_lockpage(bt, right,BtLockRead) ) + if( bt_lockpage(bt, right, BtLockRead) ) return 0; if( bt_mappage (bt, &bt->page, right) ) - break; + return 0; memcpy (bt->cursor, bt->page, bt->page_size); + if ( bt_unlockpage(bt, right, BtLockRead) ) return 0; slot = 0; + } while( 1 ); return bt->err = 0; -- 2.40.0