From: unknown Date: Thu, 13 Feb 2014 20:01:55 +0000 (-0800) Subject: incorporate reworked bt_deletekey into file I/O version X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=79cab58a40ce6639426bb4f782eee9dc3f881924 incorporate reworked bt_deletekey into file I/O version --- diff --git a/btree2t.c b/btree2t.c index 00c6f24..67d901e 100644 --- a/btree2t.c +++ b/btree2t.c @@ -1,5 +1,5 @@ // btree version 2t -// 04 FEB 2014 +// 15 FEB 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -119,10 +119,13 @@ typedef struct BtPage_ { uint cnt; // count of keys in page uint act; // count of active keys uint min; // next key offset - unsigned char bits; // page size in bits - unsigned char lvl:6; // level of page + unsigned char bits:7; // page size in bits + unsigned char free:1; // page is on free list + unsigned char lvl:4; // level of page + unsigned char kill:1; // page is empty unsigned char dirty:1; // page is dirty unsigned char posted:1; // page fence is posted + unsigned char goright:1; // continue to right link unsigned char right[BtId]; // page number to right unsigned char fence[256]; // page fence key } *BtPage; @@ -160,6 +163,7 @@ typedef struct _BtDb { uint mode; // read-write mode uint mapped_io; // use memory mapping BtPage temp; // temporary frame buffer (memory mapped/file IO) + BtPage temp2; // temporary frame buffer (memory mapped/file IO) BtPage parent; // current page's parent node (memory mapped/file IO) BtPage alloc; // frame for alloc page (memory mapped/file IO) BtPage cursor; // cached frame for start/next (never mapped) @@ -216,6 +220,7 @@ extern uint bt_tod (BtDb *bt, uint slot); // BTree page number constants #define ALLOC_page 0 #define ROOT_page 1 +#define LEAF_page 2 // Number of levels to create in a new BTree @@ -565,10 +570,10 @@ SYSTEM_INFO sysinfo[1]; bt->seg_bits++; #ifdef unix - bt->mem = malloc (7 *bt->page_size); + bt->mem = malloc (8 *bt->page_size); bt->cache = calloc (bt->hashsize, sizeof(ushort)); #else - bt->mem = VirtualAlloc(NULL, 7 * bt->page_size, MEM_COMMIT, PAGE_READWRITE); + bt->mem = VirtualAlloc(NULL, 8 * bt->page_size, MEM_COMMIT, PAGE_READWRITE); bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, bt->hashsize * sizeof(ushort)); #endif bt->frame = (BtPage)bt->mem; @@ -576,8 +581,9 @@ SYSTEM_INFO sysinfo[1]; bt->page = (BtPage)(bt->mem + 2 * bt->page_size); bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size); bt->temp = (BtPage)(bt->mem + 4 * bt->page_size); - bt->parent = (BtPage)(bt->mem + 5 * bt->page_size); - bt->zero = (BtPage)(bt->mem + 6 * bt->page_size); + bt->temp2 = (BtPage)(bt->mem + 5 * bt->page_size); + bt->parent = (BtPage)(bt->mem + 6 * bt->page_size); + bt->zero = (BtPage)(bt->mem + 7 * bt->page_size); if( size || *amt ) { if ( bt_unlockpage(bt, ALLOC_page, lockmode) ) @@ -981,18 +987,12 @@ int bt_findslot (BtPageSet *set, unsigned char *key, uint len) { uint diff, higher = set->page->cnt, low = 1, slot; - // is page being deleted? if so, - // tell caller to follow right link - - if( !set->page->act ) - return 0; - // make stopper key an infinite fence value if( bt_getid (set->page->right) ) higher++; - // low is the next candidate, higher is already + // low is the lowest candidate, higher is already // tested as .ge. the given key, loop ends when they meet while( diff = higher - low ) { @@ -1025,7 +1025,6 @@ int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lv uid page_no = ROOT_page, prevpage = 0; uint drill = 0xff, slot; uint mode, prevmode; -int posted = 1; // start at root of btree and drill down @@ -1077,12 +1076,35 @@ int posted = 1; prevpage = page_no; prevmode = mode; + // if page is being deleted and we should continue right + + if( set->page->kill && set->page->goright ) { + page_no = bt_getid (set->page->right); + continue; + } + + // otherwise, wait for deleted node to clear + + if( set->page->kill ) { + if( bt_unlockpage(bt, set->page_no, mode) ) + return bt->err; + page_no = ROOT_page; + prevpage = 0; + drill = 0xff; +#ifdef unix + sched_yield(); +#else + SwitchToThread(); +#endif + continue; + } + // find key on page at this level // and descend to requested level if( slot = bt_findslot (set, key, len) ) { if( drill == lvl ) - return bt->posted = posted, slot; + return slot; if( slot > set->page->cnt ) return bt->err = BTERR_struct, 0; @@ -1096,7 +1118,6 @@ int posted = 1; return bt->err = BTERR_struct, 0; page_no = bt_getid(slotptr(set->page, slot)->id); - posted = 1; drill--; continue; } @@ -1105,7 +1126,6 @@ int posted = 1; // (slide left from deleted page) page_no = bt_getid(set->page->right); - posted = 0; } while( page_no ); @@ -1116,92 +1136,78 @@ int posted = 1; } // drill down fixing fence values for left sibling tree -// starting with current left page in bt->temp -// call with bt->temp write locked -// return with all pages unlocked. -BTERR bt_fixfences (BtDb *bt, uid page_no, unsigned char *newfence) +// call with set write locked mapped to bt->temp +// return with set unlocked & unpinned. + +BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence) { unsigned char oldfence[256]; -uid prevpage = 0; +BtPageSet next[1]; +uid right; int chk; - memcpy (oldfence, bt->temp->fence, 256); - - // start at left page of btree and drill down - // to update their fence values - - do { - // obtain access lock using lock chaining + next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); + memcpy (oldfence, set->page->fence, 256); + next->page = bt->temp2; + bt->temp2 = bt->temp; + bt->temp = next->page; - if( prevpage ) { - if( bt_unlockpage(bt, prevpage, BtLockWrite) ) + while( !set->page->kill && set->page->lvl ) { + if( bt_lockpage (bt, next->page_no, BtLockParent) ) return bt->err; - if( bt_unlockpage(bt, prevpage, BtLockParent) ) + if( bt_lockpage (bt, next->page_no, BtLockAccess) ) return bt->err; - - // obtain parent/fence key maintenance lock - - if( bt_lockpage(bt, page_no, BtLockParent) ) - return bt->err; - - if( bt_lockpage(bt, page_no, BtLockAccess) ) - return bt->err; - - if( bt_unlockpage(bt, prevpage, BtLockWrite) ) + if( bt_lockpage (bt, next->page_no, BtLockWrite) ) + return bt->err; + if( bt_unlockpage (bt, next->page_no, BtLockAccess) ) return bt->err; - // obtain write lock using lock chaining - - if( bt_lockpage(bt, page_no, BtLockWrite) ) - return bt->err; - - if( bt_unlockpage(bt, page_no, BtLockAccess) ) - return bt->err; - - // map/obtain page contents - - if( bt_mappage (bt, &bt->temp, page_no) ) + if( bt_mappage (bt, &next->page, next->page_no) ) return bt->err; - } - chk = keycmp ((BtKey)bt->temp->fence, oldfence + 1, *oldfence); - prevpage = page_no; + chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence); if( chk < 0 ) { - page_no = bt_getid (bt->temp->right); - continue; + right = bt_getid (next->page->right); + if( bt_unlockpage (bt, next->page_no, BtLockWrite) ) + return bt->err; + if( bt_unlockpage (bt, next->page_no, BtLockParent) ) + return bt->err; + next->page_no = right; + continue; } if( chk > 0 ) - return bt->err = BTERR_struct; - - memcpy (bt->temp->fence, newfence, 256); - - if( bt_update (bt, bt->temp, page_no) ) - return bt->err; + return bt->err = BTERR_struct; - // return when we reach a leaf page + if( bt_fixfences (bt, next, newfence) ) + return bt->err; - if( !bt->temp->lvl ) { - if( bt_unlockpage (bt, page_no, BtLockWrite) ) - return bt->err; - return bt_unlockpage (bt, page_no, BtLockParent); - } + break; + } - page_no = bt_getid(slotptr(bt->temp, bt->temp->cnt)->id); + set->page = bt->temp; - } while( page_no ); + if( bt_mappage (bt, &set->page, set->page_no) ) + return bt->err; - // return error on end of right chain + memcpy (set->page->fence, newfence, 256); - return bt->err = BTERR_struct; + if( bt_update(bt, set->page, set->page_no) ) + return bt->err; + if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) + return bt->err; + if( bt_unlockpage (bt, set->page_no, BtLockParent) ) + return bt->err; + return 0; } + // return page to free list // page must be delete & write locked -BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no) +BTERR bt_freepage (BtDb *bt, BtPageSet *set) { // lock & map allocation page @@ -1212,12 +1218,13 @@ BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no) return bt->err; // store chain in second right - bt_putid(page->right, bt_getid(bt->alloc[1].right)); - bt_putid(bt->alloc[1].right, page_no); + bt_putid(set->page->right, bt_getid(bt->alloc[1].right)); + bt_putid(bt->alloc[1].right, set->page_no); + set->page->free = 1; if( bt_update(bt, bt->alloc, ALLOC_page) ) return bt->err; - if( bt_update(bt, page, page_no) ) + if( bt_update(bt, set->page, set->page_no) ) return bt->err; // unlock page zero @@ -1227,15 +1234,15 @@ BTERR bt_freepage (BtDb *bt, BtPage page, uid page_no) // remove write lock on deleted node - if( bt_unlockpage(bt, page_no, BtLockWrite) ) + if( bt_unlockpage(bt, set->page_no, BtLockWrite) ) return bt->err; - return bt_unlockpage (bt, page_no, BtLockDelete); + return bt_unlockpage (bt, set->page_no, BtLockDelete); } // remove the root level by promoting its only child -BTERR bt_removeroot (BtDb *bt, BtPage root, BtPage child, uid page_no) +BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child) { uid next = 0; @@ -1246,20 +1253,20 @@ uid next = 0; if( bt_lockpage (bt, next, BtLockWrite) ) return bt->err; - if( bt_mappage (bt, &child, next) ) + if( bt_mappage (bt, &child->page, next) ) return bt->err; - page_no = next; + child->page_no = next; } - memcpy (root, child, bt->page_size); - next = bt_getid (slotptr(child, child->cnt)->id); + memcpy (root->page, child->page, bt->page_size); + next = bt_getid (slotptr(child->page, child->page->cnt)->id); - if( bt_freepage (bt, child, page_no) ) + if( bt_freepage (bt, child) ) return bt->err; - } while( root->lvl > 1 && root->cnt == 1 ); + } while( root->page->lvl > 1 && root->page->cnt == 1 ); - if( bt_update (bt, root, ROOT_page) ) + if( bt_update (bt, root->page, ROOT_page) ) return bt->err; return bt_unlockpage (bt, ROOT_page, BtLockWrite); @@ -1267,42 +1274,12 @@ uid next = 0; // pull right page over ourselves in simple merge -BTERR bt_mergeright (BtDb *bt, uid page_no, BtPageSet *parent, uint slot) +BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx) { -uid right; -uint idx; - - // find our right neighbor - // right must exist because the stopper prevents - // the rightmost page from deleting - - for( idx = slot; idx++ < parent->page->cnt; ) - if( !slotptr(parent->page, idx)->dead ) - break; - - right = bt_getid (slotptr (parent->page, idx)->id); - - if( right != bt_getid (bt->page->right) ) - return bt->err = BTERR_struct; - - if( bt_lockpage (bt, right, BtLockDelete) ) - return bt->err; - - if( bt_lockpage (bt, right, BtLockWrite) ) - return bt->err; - - if( bt_mappage (bt, &bt->temp, right) ) - return bt->err; - - memcpy (bt->page, bt->temp, bt->page_size); - - if( bt_update(bt, bt->page, page_no) ) - return bt->err; - // install ourselves as child page // and delete ourselves from parent - bt_putid (slotptr(parent->page, idx)->id, page_no); + bt_putid (slotptr(parent->page, idx)->id, set->page_no); slotptr(parent->page, slot)->dead = 1; parent->page->act--; @@ -1315,15 +1292,20 @@ uint idx; } else break; - if( bt_freepage (bt, bt->temp, right) ) + memcpy (set->page, right->page, bt->page_size); + + if( bt_unlockpage (bt, right->page_no, BtLockParent) ) + return bt->err; + + if( bt_freepage (bt, right) ) return bt->err; // do we need to remove a btree level? // (leave the first page of leaves alone) if( parent->page_no == ROOT_page && parent->page->cnt == 1 ) - if( bt->page->lvl ) - return bt_removeroot (bt, parent->page, bt->page, page_no); + if( set->page->lvl ) + return bt_removeroot (bt, parent, set); if( bt_update (bt, parent->page, parent->page_no) ) return bt->err; @@ -1331,10 +1313,13 @@ uint idx; if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockWrite) ) + if( bt_update (bt, set->page, set->page_no) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockDelete) ) + if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) + return bt->err; + + if( bt_unlockpage (bt, set->page_no, BtLockDelete) ) return bt->err; return 0; @@ -1343,94 +1328,67 @@ uint idx; // remove both child and parent from the btree // from the fence position in the parent -BTERR bt_removeparent (BtDb *bt, uid page_no, BtPageSet *parent, uint lvl) +BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl) { -unsigned char rightfence[256], pagefence[256]; -uid right, ppage_no; - - right = bt_getid (bt->page->right); - - if( bt_lockpage (bt, right, BtLockParent) ) - return bt->err; - - if( bt_lockpage (bt, right, BtLockAccess) ) - return bt->err; - - if( bt_lockpage (bt, right, BtLockWrite) ) - return bt->err; - - if( bt_unlockpage (bt, right, BtLockAccess) ) - return bt->err; - - if( bt_mappage (bt, &bt->temp, right) ) - return bt->err; - - // save right page fence value and - // parent fence value - - memcpy (rightfence, bt->temp->fence, 256); - memcpy (pagefence, parent->page->fence, 256); - ppage_no = parent->page_no; +unsigned char pagefence[256]; +uint idx; // pull right sibling over ourselves and unlock - memcpy (bt->page, bt->temp, bt->page_size); - - if( bt_update(bt, bt->page, page_no) ) - return bt->err; + memcpy (child->page, right->page, bt->page_size); - if( bt_unlockpage (bt, page_no, BtLockDelete) ) + if( bt_update(bt, child->page, child->page_no) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockWrite) ) + if( bt_unlockpage (bt, child->page_no, BtLockWrite) ) return bt->err; - // install ourselves into right link from old right page + // install ourselves into right link of old right page - bt->temp->act = 0; // tell bt_findslot to go right (left) - bt_putid (bt->temp->right, page_no); + bt_putid (right->page->right, child->page_no); + right->page->goright = 1; // tell bt_loadpage to go right to us + right->page->kill = 1; - if( bt_update (bt, bt->temp, right) ) + if( bt_update(bt, right->page, right->page_no) ) return bt->err; - if( bt_unlockpage (bt, right, BtLockWrite) ) + if( bt_unlockpage (bt, right->page_no, BtLockWrite) ) return bt->err; // remove our slot from our parent - // clear act to signal bt_findslot to move right + // signal to move right - slotptr(parent->page, parent->page->cnt)->dead = 1; - parent->page->act = 0; + parent->page->goright = 1; // tell bt_findslot to go right to rparent + parent->page->kill = 1; + parent->page->act--; - if( bt_update (bt, parent->page, ppage_no) ) - return bt->err; - if( bt_unlockpage (bt, ppage_no, BtLockWrite) ) - return bt->err; + // redirect right page pointer in right parent to us - // redirect right page pointer in its parent to our left - // and free the right page + for( idx = 0; idx++ < rparent->page->cnt; ) + if( !slotptr(rparent->page, idx)->dead ) + break; - if( bt_insertkey (bt, rightfence+1, *rightfence, lvl, page_no, time(NULL)) ) - return bt->err; + if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no ) + return bt->err = BTERR_struct; + + bt_putid (slotptr(rparent->page, idx)->id, child->page_no); - if( bt_removepage (bt, ppage_no, lvl, pagefence) ) + if( bt_update (bt, rparent->page, rparent->page_no) ) return bt->err; - if( bt_unlockpage (bt, right, BtLockParent) ) + if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) ) return bt->err; - // wait for others to drain away + // save parent page fence value - if( bt_lockpage (bt, right, BtLockDelete) ) - return bt->err; + memcpy (pagefence, parent->page->fence, 256); - if( bt_lockpage (bt, right, BtLockWrite) ) + if( bt_update (bt, parent->page, parent->page_no) ) return bt->err; - - if( bt_mappage (bt, &bt->temp, right) ) + if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) return bt->err; - return bt_freepage (bt, bt->temp, right); + return bt_removepage (bt, parent->page_no, lvl, pagefence); } // remove page from btree @@ -1439,22 +1397,24 @@ uid right, ppage_no; BTERR bt_removepage (BtDb *bt, uid page_no, uint lvl, unsigned char *pagefence) { -BtPageSet parent[1]; +BtPageSet parent[1], rparent[1], sibling[1], set[1]; +unsigned char newfence[256]; uint slot, idx; BtKey ptr; -uid left; - parent->page = bt->parent; + parent->page = bt->parent; + set->page_no = page_no; + set->page = bt->page; // load and lock our parent -retry: + while( 1 ) { if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) ) return bt->err; // wait until we are posted in our parent - if( !bt->posted ) { + if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) { if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) return bt->err; #ifdef unix @@ -1462,37 +1422,74 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } - // wait for others to finish - // and obtain final WriteLock + // can we do a simple merge entirely + // between siblings on the parent page? - if( bt_lockpage (bt, page_no, BtLockDelete) ) - return bt->err; + if( slot < parent->page->cnt ) { + // find our right neighbor + // right must exist because the stopper prevents + // the rightmost page from deleting - if( bt_lockpage (bt, page_no, BtLockWrite) ) - return bt->err; + for( idx = slot; idx++ < parent->page->cnt; ) + if( !slotptr(parent->page, idx)->dead ) + break; - if( bt_mappage (bt, &bt->page, page_no) ) - return bt->err; + sibling->page_no = bt_getid (slotptr (parent->page, idx)->id); - // was page re-established? + if( bt_lockpage (bt, set->page_no, BtLockDelete) ) + return bt->err; - if( bt->page->act ) { - if( bt_unlockpage (bt, page_no, BtLockWrite) ) + if( bt_lockpage (bt, set->page_no, BtLockWrite) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockDelete) ) + + if( bt_mappage (bt, &set->page, set->page_no) ) return bt->err; - return bt_unlockpage (bt, parent->page_no, BtLockWrite); - } - - // can we do a simple merge entirely - // between siblings on the parent page? + // merge right if sibling shows up in + // our parent and is not being killed - if( slot < parent->page->cnt ) - return bt_mergeright(bt, page_no, parent, slot); + if( sibling->page_no == bt_getid (set->page->right) ) { + if( bt_lockpage (bt, sibling->page_no, BtLockParent) ) + return bt->err; + + if( bt_lockpage (bt, sibling->page_no, BtLockDelete) ) + return bt->err; + + if( bt_lockpage (bt, sibling->page_no, BtLockWrite) ) + return bt->err; + + sibling->page = bt->temp; + + if( bt_mappage (bt, &sibling->page, sibling->page_no) ) + return bt->err; + + if( !sibling->page->kill ) + return bt_mergeright(bt, set, parent, sibling, slot, idx); + + // try again later + + if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) ) + return bt->err; + } + + if( bt_unlockpage (bt, set->page_no, BtLockDelete) ) + return bt->err; + + if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) + return bt->err; + + if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) + return bt->err; +#ifdef linux + sched_yield (); +#else + SwitchToThread(); +#endif + continue; + } // find our left neighbor in our parent page @@ -1502,51 +1499,127 @@ retry: // if no left neighbor, delete ourselves and our parent - if( !idx ) - return bt_removeparent (bt, page_no, parent, lvl+1); + if( !idx ) { + if( bt_lockpage (bt, set->page_no, BtLockAccess) ) + return bt->err; + + if( bt_lockpage (bt, set->page_no, BtLockWrite) ) + return bt->err; + + if( bt_unlockpage (bt, set->page_no, BtLockAccess) ) + return bt->err; + + if( bt_mappage (bt, &set->page, set->page_no) ) + return bt->err; + + rparent->page_no = bt_getid (parent->page->right); + rparent->page = bt->temp; + + if( bt_lockpage (bt, rparent->page_no, BtLockAccess) ) + return bt->err; + + if( bt_lockpage (bt, rparent->page_no, BtLockWrite) ) + return bt->err; + + if( bt_unlockpage (bt, rparent->page_no, BtLockAccess) ) + return bt->err; + + if( bt_mappage (bt, &rparent->page, rparent->page_no) ) + return bt->err; + + if( !rparent->page->kill ) { + sibling->page_no = bt_getid (set->page->right); + + if( bt_lockpage (bt, sibling->page_no, BtLockAccess) ) + return bt->err; + + if( bt_lockpage (bt, sibling->page_no, BtLockWrite) ) + return bt->err; + + if( bt_unlockpage (bt, sibling->page_no, BtLockAccess) ) + return bt->err; + + sibling->page = bt->temp2; + + if( bt_mappage (bt, &sibling->page, sibling->page_no) ) + return bt->err; + + if( !sibling->page->kill ) + return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1); + + // try again later + + if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) ) + return bt->err; + } + + if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) + return bt->err; + + if( bt_unlockpage (bt, rparent->page_no, BtLockWrite) ) + return bt->err; + + if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) + return bt->err; +#ifdef linux + sched_yield(); +#else + SwitchToThread(); +#endif + continue; + } - // lock and map our left neighbor's page + // redirect parent to our left sibling + // lock and map our left sibling's page - left = bt_getid (slotptr(parent->page, idx)->id); + sibling->page_no = bt_getid (slotptr(parent->page, idx)->id); + sibling->page = bt->temp; // wait our turn on fence key maintenance - if( bt_lockpage(bt, left, BtLockParent) ) + if( bt_lockpage(bt, sibling->page_no, BtLockParent) ) return bt->err; - if( bt_lockpage(bt, left, BtLockAccess) ) + if( bt_lockpage(bt, sibling->page_no, BtLockAccess) ) return bt->err; - if( bt_lockpage(bt, left, BtLockWrite) ) + if( bt_lockpage(bt, sibling->page_no, BtLockWrite) ) return bt->err; - if( bt_unlockpage(bt, left, BtLockAccess) ) + if( bt_unlockpage(bt, sibling->page_no, BtLockAccess) ) return bt->err; - if( bt_mappage (bt, &bt->temp, left) ) + if( bt_mappage (bt, &sibling->page, sibling->page_no) ) return bt->err; // wait until sibling is in our parent - if( bt_getid (bt->temp->right) != page_no ) { + if( bt_getid (sibling->page->right) != set->page_no ) { if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) return bt->err; - if( bt_unlockpage (bt, left, BtLockWrite) ) - return bt->err; - if( bt_unlockpage (bt, left, BtLockParent) ) + if( bt_unlockpage (bt, sibling->page_no, BtLockWrite) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockWrite) ) - return bt->err; - if( bt_unlockpage (bt, page_no, BtLockDelete) ) + if( bt_unlockpage (bt, sibling->page_no, BtLockParent) ) return bt->err; #ifdef linux sched_yield(); #else SwitchToThread(); #endif - goto retry; + continue; } + // map page being killed + + if( bt_lockpage (bt, set->page_no, BtLockDelete) ) + return bt->err; + + if( bt_lockpage (bt, set->page_no, BtLockWrite) ) + return bt->err; + + if( bt_mappage (bt, &set->page, set->page_no) ) + return bt->err; + // delete our left sibling from parent slotptr(parent->page,idx)->dead = 1; @@ -1555,11 +1628,11 @@ retry: // redirect our parent slot to our left sibling - bt_putid (slotptr(parent->page, slot)->id, left); - - // update left page with our fence key + bt_putid (slotptr(parent->page, slot)->id, sibling->page_no); + memcpy (sibling->page->right, set->page->right, BtId); - memcpy (bt->temp->right, bt->page->right, BtId); + if( bt_update (bt, sibling->page, sibling->page_no) ) + return bt->err; // collapse dead slots from parent @@ -1575,21 +1648,37 @@ retry: if( bt_update (bt, parent->page, parent->page_no) ) return bt->err; + // free our original page + + if( bt_freepage (bt, set) ) + return bt->err; + // go down the left node's fence keys to the leaf level // and update the fence keys in each page - if( bt_fixfences (bt, left, pagefence) ) + memcpy (newfence, parent->page->fence, 256); + + if( bt_fixfences (bt, sibling, newfence) ) return bt->err; - if( bt_unlockpage (bt, parent->page_no, BtLockWrite) ) + // promote sibling as new root? + + if( parent->page_no == ROOT_page && parent->page->cnt == 1 ) + if( sibling->page->lvl ) { + if( bt_lockpage (bt, sibling->page_no, BtLockDelete) ) return bt->err; - // free our original page + if( bt_lockpage (bt, sibling->page_no, BtLockWrite) ) + return bt->err; - if( bt_mappage (bt, &bt->temp, page_no) ) + if( bt_mappage (bt, &sibling->page, set->page_no) ) return bt->err; - return bt_freepage (bt, bt->temp,page_no); + return bt_removeroot (bt, parent, sibling); + } + + return bt_unlockpage (bt, parent->page_no, BtLockWrite); + } } // find and delete key on page by marking delete flag bit @@ -1598,7 +1687,7 @@ retry: BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len) { unsigned char pagefence[256]; -uint slot, found, act, idx; +uint slot, found, idx; BtPageSet set[1]; BtKey ptr; @@ -1626,21 +1715,27 @@ BtKey ptr; memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot)); } else break; + } - if( bt_update(bt, set->page, set->page_no) ) + if( set->page->act ) { + if( bt_update(bt, set->page, set->page_no) ) return bt->err; - } + bt->found = found; + return bt_unlockpage (bt, set->page_no, BtLockWrite); + } // delete page when empty memcpy (pagefence, set->page->fence, 256); - act = set->page->act; + set->page->kill = 1; + + if( bt_update(bt, set->page, set->page_no) ) + return bt->err; if( bt_unlockpage(bt, set->page_no, BtLockWrite) ) return bt->err; - if( !act ) - if( bt_removepage (bt, set->page_no, 0, pagefence) ) + if( bt_removepage (bt, set->page_no, 0, pagefence) ) return bt->err; bt->found = found; @@ -1711,15 +1806,18 @@ BtKey key; if( slotptr(bt->frame,cnt)->dead ) continue; + off = slotptr(bt->frame,cnt)->off; + // copy key - if( !page->lvl || cnt < max ) { + + if( off >= sizeof(*page) ) { key = keyptr(bt->frame, cnt); off = nxt -= key->len + 1; memcpy ((unsigned char *)page + nxt, key, key->len + 1); - } else - off = offsetof(struct BtPage_, fence); + } // copy slot + memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId); slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; slotptr(page, idx)->off = off; @@ -1793,9 +1891,9 @@ uid new_page; BTERR bt_splitpage (BtDb *bt, BtPageSet *set) { uint cnt = 0, idx = 0, max, nxt = bt->page_size, off; -uid right, page_no = set->page_no; unsigned char fencekey[256]; uint lvl = set->page->lvl; +uid right; BtKey key; // split higher half of keys to bt->frame @@ -1819,7 +1917,7 @@ BtKey key; bt->frame->act++; } - if( page_no == ROOT_page ) + if( set->page_no == ROOT_page ) bt->frame->posted = 1; memcpy (bt->frame->fence, set->page->fence, 256); @@ -1830,7 +1928,7 @@ BtKey key; // link right node - if( page_no > ROOT_page ) + if( set->page_no > ROOT_page ) memcpy (bt->frame->right, set->page->right, BtId); // get new free page and write higher keys to it. @@ -1877,65 +1975,53 @@ BtKey key; // if current page is the root page, split it - if( page_no == ROOT_page ) + if( set->page_no == ROOT_page ) return bt_splitroot (bt, set, right); - if( bt_update (bt, set->page, page_no) ) + if( bt_update (bt, set->page, set->page_no) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockWrite) ) + if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) return bt->err; // insert new fences in their parent pages while( 1 ) { - if( bt_lockpage (bt, page_no, BtLockParent) ) + if( bt_lockpage (bt, set->page_no, BtLockParent) ) return bt->err; - if( bt_lockpage (bt, page_no, BtLockRead) ) + if( bt_lockpage (bt, set->page_no, BtLockWrite) ) return bt->err; - if( bt_mappage (bt, &set->page, page_no) ) + if( bt_mappage (bt, &set->page, set->page_no) ) return bt->err; memcpy (fencekey, set->page->fence, 256); + right = bt_getid (set->page->right); if( set->page->posted ) { - if( bt_unlockpage (bt, page_no, BtLockParent) ) + if( bt_unlockpage (bt, set->page_no, BtLockParent) ) return bt->err; - return bt_unlockpage (bt, page_no, BtLockRead); + return bt_unlockpage (bt, set->page_no, BtLockWrite); } - if( bt_unlockpage (bt, page_no, BtLockRead) ) - return bt->err; - - if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) ) - return bt->err; - - if( bt_lockpage (bt, page_no, BtLockWrite) ) - return bt->err; - - if( bt_mappage (bt, &set->page, page_no) ) - return bt->err; - - right = bt_getid (set->page->right); set->page->posted = 1; - if( bt_update (bt, set->page, page_no) ) + if( bt_update (bt, set->page, set->page_no) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockWrite) ) + if( bt_unlockpage (bt, set->page_no, BtLockWrite) ) return bt->err; - if( bt_unlockpage (bt, page_no, BtLockParent) ) + if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, set->page_no, time(NULL)) ) return bt->err; - if( !(page_no = right) ) - break; - - if( bt_mappage (bt, &set->page, page_no) ) + if( bt_unlockpage (bt, set->page_no, BtLockParent) ) return bt->err; + + if( !(set->page_no = right) ) + break; } return 0; @@ -2030,7 +2116,9 @@ uint slot; // cache page for retrieval if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) ) memcpy (bt->cursor, set->page, bt->page_size); + bt->cursor_page = set->page_no; + if ( bt_unlockpage(bt, set->page_no, BtLockRead) ) return 0;