From: unknown Date: Thu, 30 Jan 2014 00:16:14 +0000 (-0800) Subject: rework db_deletekey to fix problems X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=86d1b01d3f45d7f692656e89cad3ecaf734a3469 rework db_deletekey to fix problems --- diff --git a/threads2h.c b/threads2h.c index 89e1894..db5c991 100644 --- a/threads2h.c +++ b/threads2h.c @@ -1,5 +1,5 @@ // btree version threads2h pthread rw lock version -// 24 JAN 2014 +// 29 JAN 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -125,7 +125,7 @@ typedef struct { typedef struct { BtLatch readwr[1]; // read/write page lock BtLatch access[1]; // Access Intent/Page delete - BtLatch parent[1]; // adoption of foster children + BtLatch parent[1]; // Posting of fence key in parent BtSpinLatch busy[1]; // slot is being moved between chains volatile ushort next; // next entry in hash table chain volatile ushort prev; // prev entry in hash table chain @@ -174,11 +174,9 @@ typedef struct Page { uint act; // count of active keys uint min; // next key offset unsigned char bits; // page size in bits - unsigned char lvl:6; // level of page - unsigned char kill:1; // page is being deleted + unsigned char lvl:7; // level of page unsigned char dirty:1; // page has deleted keys unsigned char right[BtId]; // page number to right - BtSlot table[0]; // array of key slots } *BtPage; // The memory mapping pool table buffer manager entry @@ -192,7 +190,7 @@ typedef struct { void *hashprev; // previous pool entry for the same hash idx void *hashnext; // next pool entry for the same hash idx #ifndef unix - HANDLE hmap; + HANDLE hmap; // Windows memory mapping handle #endif } BtPool; @@ -248,6 +246,7 @@ typedef struct { BtLatchSet *set; // current page latch set BtPool *pool; // current page pool unsigned char *mem; // frame, cursor, page memory buffer + int parent; // last loadpage was from a parent level int found; // last delete or insert was found int err; // last error } BtDb; @@ -265,12 +264,17 @@ typedef enum { // B-Tree functions extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr); -extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod); -extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl); +extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl); +extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len); extern uid bt_findkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_nextkey (BtDb *bt, uint slot); +// internal functions +BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no); +uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot); +BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl); + // manager functions extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize); void bt_mgrclose (BtMgr *mgr); @@ -338,7 +342,7 @@ extern uint bt_tod (BtDb *bt, uint slot); // Access macros to address slot and key values from the page -#define slotptr(page, slot) (page->table + slot-1) +#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1)) #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off)) void bt_putid(unsigned char *dest, uid id) @@ -1508,6 +1512,7 @@ BtLatchSet *set, *prevset; uint drill = 0xff, slot; uint mode, prevmode; BtPool *prevpool; +int parent = 1; // start at root of btree and drill down @@ -1567,30 +1572,33 @@ BtPool *prevpool; // find key on page at this level // and descend to requested level - if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) { + if( slot = bt_findslot (bt, key, len) ) { if( drill == lvl ) - return slot; + return bt->parent = parent, slot; while( slotptr(bt->page, slot)->dead ) if( slot++ < bt->page->cnt ) continue; else { page_no = bt_getid(bt->page->right); + parent = 0; goto slideright; } page_no = bt_getid(slotptr(bt->page, slot)->id); + parent = 1; drill--; } // or slide right into next page - // (slide left from deleted page) - else + else { page_no = bt_getid(bt->page->right); + parent = 0; + } // continue down / right using overlapping locks - // to protect pages being killed or split. + // to protect pages being split. slideright: prevpage = bt->page_no; @@ -1605,119 +1613,384 @@ slideright: return 0; // return error } -// find and delete key on page by marking delete flag bit -// when page becomes empty, delete it +// remove empty page from the B-tree +// by pulling our right node left over ourselves + +// call with bt->page, etc, set to page's locked parent +// returns with page locked. -BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) +BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot) { -unsigned char lowerkey[256], higherkey[256]; -BtLatchSet *rset, *set; -BtPool *pool, *rpool; -uid page_no, right; -uint slot, tod; -BtPage rpage; +BtLatchSet *rset, *pset, *rpset; +BtPool *rpool, *ppool, *rppool; +BtPage rpage, ppage, rppage; +uid right, parent, rparent; BtKey ptr; +uint idx; - if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) - ptr = keyptr(bt->page, slot); + // cache node's parent page + + parent = bt->page_no; + ppage = bt->page; + ppool = bt->pool; + pset = bt->set; + + // lock and map our right page + // it cannot be NULL because of the stopper + // in the last right page + + bt_lockpage (BtLockWrite, set); + + // if we aren't dead yet + + if( page->act ) + goto rmergexit; + + if( right = bt_getid (page->right) ) + if( rpool = bt_pinpool (bt, right) ) + rpage = bt_page (bt, rpool, right); + else + return bt->err; else + return bt->err = BTERR_struct; + + rset = bt_pinlatch (bt, right); + + // find our right neighbor + + if( ppage->act > 1 ) { + for( idx = slot; idx++ < ppage->cnt; ) + if( !slotptr(ppage, idx)->dead ) + break; + + if( idx > ppage->cnt ) + return bt->err = BTERR_struct; + + // redirect right neighbor in parent to left node + + bt_putid(slotptr(ppage,idx)->id, page_no); + } + + // if parent has only our deleted page, e.g. no right neighbor + // prepare to merge parent itself + + if( ppage->act == 1 ) { + if( rparent = bt_getid (ppage->right) ) + if( rppool = bt_pinpool (bt, rparent) ) + rppage = bt_page (bt, rppool, rparent); + else return bt->err; + else + return bt->err = BTERR_struct; - // if key is found delete it, otherwise ignore request + rpset = bt_pinlatch (bt, rparent); + bt_lockpage (BtLockWrite, rpset); - if( bt->found = !keycmp (ptr, key, len) ) - if( bt->found = slotptr(bt->page, slot)->dead == 0 ) { - slotptr(bt->page,slot)->dead = 1; - if( slot < bt->page->cnt ) - bt->page->dirty = 1; - bt->page->act--; + // find our right neighbor on right parent page + + for( idx = 0; idx++ < rppage->cnt; ) + if( !slotptr(rppage, idx)->dead ) { + bt_putid (slotptr(rppage, idx)->id, page_no); + break; } - // return if page is not empty, or it has no right sibling + if( idx > rppage->cnt ) + return bt->err = BTERR_struct; + } - right = bt_getid(bt->page->right); - page_no = bt->page_no; - pool = bt->pool; - set = bt->set; + // now that there are no more pointers to our right node + // we can wait for delete lock on it - if( !right || bt->page->act ) { - bt_unlockpage(BtLockWrite, set); - bt_unpinlatch (set); - bt_unpinpool (pool); + bt_lockpage(BtLockDelete, rset); + bt_lockpage(BtLockWrite, rset); + + // pull contents of right page into our empty page + + memcpy (page, rpage, bt->mgr->page_size); + + // ready to release right parent lock + // now that we have a new page in place + + if( ppage->act == 1 ) { + bt_unlockpage (BtLockWrite, rpset); + bt_unpinlatch (rpset); + bt_unpinpool (rppool); + } + + // add killed right block to free chain + // lock latch mgr + + bt_spinwritelock(bt->mgr->latchmgr->lock); + + // store free chain in allocation page second right + + bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); + bt_putid(bt->mgr->latchmgr->alloc[1].right, right); + + // unlock latch mgr and right page + + bt_unlockpage(BtLockDelete, rset); + bt_unlockpage(BtLockWrite, rset); + bt_unpinlatch (rset); + bt_unpinpool (rpool); + + bt_spinreleasewrite(bt->mgr->latchmgr->lock); + + // delete our obsolete fence key from our parent + + slotptr(ppage, slot)->dead = 1; + ppage->dirty = 1; + + // if our parent now empty + // remove it from the tree + + if( ppage->act-- == 1 ) + if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) ) + return bt->err; + +rmergexit: + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); + + bt->found = 1; + return bt->err = 0; +} + +// remove empty page from the B-tree +// try merging left first. If no left +// sibling, then merge right. + +// call with page loaded and locked, +// return with page locked. + +BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl) +{ +unsigned char fencekey[256], postkey[256]; +uint slot, idx, postfence = 0; +BtLatchSet *lset, *pset; +BtPool *lpool, *ppool; +BtPage lpage, ppage; +uid left, parent; +BtKey ptr; + + ptr = keyptr(page, page->cnt); + memcpy(fencekey, ptr, ptr->len + 1); + bt_unlockpage (BtLockWrite, set); + + // load and lock our parent + +retry: + if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) ) return bt->err; + + parent = bt->page_no; + ppage = bt->page; + ppool = bt->pool; + pset = bt->set; + + // wait until we are posted in our parent + + if( !bt->parent ) { + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); +#ifdef unix + sched_yield(); +#else + SwitchToThread(); +#endif + goto retry; } - // obtain Parent lock over write lock + // find our left neighbor in our parent page - bt_lockpage(BtLockParent, set); + for( idx = slot; --idx; ) + if( !slotptr(ppage, idx)->dead ) + break; - // keep copy of key to delete + // if no left neighbor, do right merge - ptr = keyptr(bt->page, bt->page->cnt); - memcpy(lowerkey, ptr, ptr->len + 1); + if( !idx ) + return bt_mergeright (bt, page, pool, set, page_no, lvl, slot); - // lock and map right page + // lock and map our left neighbor's page - if( rpool = bt_pinpool (bt, right) ) - rpage = bt_page (bt, rpool, right); + left = bt_getid (slotptr(ppage, idx)->id); + + if( lpool = bt_pinpool (bt, left) ) + lpage = bt_page (bt, lpool, left); else return bt->err; - rset = bt_pinlatch (bt, right); - bt_lockpage(BtLockWrite, rset); + lset = bt_pinlatch (bt, left); + bt_lockpage(BtLockWrite, lset); - // pull contents of next page into current empty page + // wait until sibling is in our parent - memcpy (bt->page, rpage, bt->mgr->page_size); + if( bt_getid (lpage->right) != page_no ) { + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); + bt_unlockpage (BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); +#ifdef linux + sched_yield(); +#else + SwitchToThread(); +#endif + goto retry; + } - // keep copy of key to update + // since our page will have no more pointers to it, + // obtain Delete lock and wait for write locks to clear - ptr = keyptr(rpage, rpage->cnt); - memcpy(higherkey, ptr, ptr->len + 1); + bt_lockpage(BtLockDelete, set); + bt_lockpage(BtLockWrite, set); - // Mark right page as deleted and point it to left page - // until we can post updates at higher level. + // if we aren't dead yet, + // get ready for exit - bt_putid(rpage->right, page_no); - rpage->kill = 1; - rpage->cnt = 0; + if( page->act ) { + bt_unlockpage(BtLockDelete, set); + bt_unlockpage(BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); + goto lmergexit; + } - bt_unlockpage(BtLockWrite, rset); - bt_unlockpage(BtLockWrite, set); + // are we are the fence key for our parent? + // if so, grab our old fence key - // delete old lower key to consolidated node + if( postfence = slot == ppage->cnt ) { + ptr = keyptr (ppage, ppage->cnt); + memcpy(fencekey, ptr, ptr->len + 1); + memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot)); - if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) ) - return bt->err; + // clear out other dead slots - // redirect higher key directly to consolidated node + while( --ppage->cnt ) + if( slotptr(ppage, ppage->cnt)->dead ) + memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot)); + else + break; - tod = (uint)time(NULL); + ptr = keyptr (ppage, ppage->cnt); + memcpy(postkey, ptr, ptr->len + 1); + } else + slotptr(ppage,slot)->dead = 1; - if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) ) - return bt->err; + ppage->dirty = 1; + ppage->act--; - // add killed right block to free chain + // push our right neighbor pointer to our left + + memcpy (lpage->right, page->right, BtId); + + // add ourselves to free chain // lock latch mgr bt_spinwritelock(bt->mgr->latchmgr->lock); // store free chain in allocation page second right - bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); - bt_putid(bt->mgr->latchmgr->alloc[1].right, right); + bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); + bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no); - // unlock latch mgr and unpin right page + // unlock latch mgr and pages bt_spinreleasewrite(bt->mgr->latchmgr->lock); - bt_unpinlatch (rset); - bt_unpinpool (rpool); + bt_unlockpage(BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); + + // release our node's delete lock + + bt_unlockpage(BtLockDelete, set); + +lmergexit: + bt_unlockpage (BtLockWrite, pset); + bt_unpinpool (ppool); + + // do we need to post parent's fence key in its parent? - // remove ParentModify lock + if( !postfence || parent == ROOT_page ) { + bt_unpinlatch (pset); + bt->found = 1; + return bt->err = 0; + } + + // interlock parent fence post + + bt_lockpage (BtLockParent, pset); + + // load parent's parent page +posttry: + if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) ) + return bt->err; + + if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) ) + if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) ) + return bt->err; + else + goto posttry; + + page = bt->page; + + page->min -= *postkey + 1; + ((unsigned char *)page)[page->min] = *postkey; + memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey ); + slotptr(page, slot)->off = page->min; + + bt_unlockpage (BtLockParent, pset); + bt_unpinlatch (pset); + + bt_unlockpage (BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); - bt_unlockpage(BtLockParent, set); + bt->found = 1; + return bt->err = 0; +} + +// find and delete key on page by marking delete flag bit +// if page becomes empty, delete it from the btree + +BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len) +{ +BtLatchSet *set; +BtPool *pool; +BtPage page; +uid page_no; +BtKey ptr; +uint slot; + + if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) ) + return bt->err; + + page_no = bt->page_no; + page = bt->page; + pool = bt->pool; + set = bt->set; + + // if key is found delete it, otherwise ignore request + + ptr = keyptr(page, slot); + + if( bt->found = !keycmp (ptr, key, len) ) + if( bt->found = slotptr(page, slot)->dead == 0 ) { + slotptr(page,slot)->dead = 1; + if( slot < page->cnt ) + page->dirty = 1; + if( !--page->act ) + if( bt_mergeleft (bt, page, pool, set, page_no, 0) ) + return bt->err; + } + + bt_unlockpage(BtLockWrite, set); bt_unpinlatch (set); bt_unpinpool (pool); - return 0; + return bt->err = 0; } // find key in leaf level and return row-id @@ -1736,7 +2009,7 @@ uid id; // if key exists, return row-id // otherwise return 0 - if( ptr->len == len && !memcmp (ptr->key, key, len) ) + if( slot <= bt->page->cnt && !keycmp (ptr, key, len) ) id = bt_getid(slotptr(bt->page,slot)->id); else id = 0; @@ -1749,16 +2022,15 @@ uid id; // check page for space available, // clean if necessary and return -// =0 - page needs splitting -// >0 - go ahead at returned slot +// 0 - page needs splitting +// >0 new slot value -uint bt_cleanpage(BtDb *bt, uint amt, uint slot) +uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot) { uint nxt = bt->mgr->page_size; -BtPage page = bt->page; uint cnt = 0, idx = 0; uint max = page->cnt; -uint newslot; +uint newslot = max; BtKey key; if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) @@ -1777,15 +2049,19 @@ BtKey key; page->dirty = 0; page->act = 0; - // always leave fence key in list + // try cleaning up page first + + // always leave fence key in the array + // otherwise, remove deleted key while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 1; - else if( cnt < max && slotptr(bt->frame,cnt)->dead ) + if( cnt < max && slotptr(bt->frame,cnt)->dead ) continue; // copy key + key = keyptr(bt->frame, cnt); nxt -= key->len + 1; memcpy ((unsigned char *)page + nxt, key, key->len + 1); @@ -1797,26 +2073,59 @@ BtKey key; slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; slotptr(page, idx)->off = nxt; } + page->min = nxt; page->cnt = idx; + // see if page has enough space now, or does it need splitting? + if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) return newslot; return 0; } -// split the root and raise the height of the btree +// add key to current page +// page must already be writelocked -BTERR bt_splitroot(BtDb *bt, unsigned char *newkey, unsigned char *oldkey, uid page_no2) +void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod) +{ +uint idx; + + // find next available dead slot and copy key onto page + + for( idx = slot; idx < page->cnt; idx++ ) + if( slotptr(page, idx)->dead ) + break; + + if( idx == page->cnt ) + idx++, page->cnt++; + + page->act++; + + // now insert key into array before slot + + while( idx > slot ) + *slotptr(page, idx) = *slotptr(page, idx -1), idx--; + + page->min -= len + 1; + ((unsigned char *)page)[page->min] = len; + memcpy ((unsigned char *)page + page->min +1, key, len ); + + bt_putid(slotptr(page,slot)->id, id); + slotptr(page, slot)->off = page->min; + slotptr(page, slot)->tod = tod; + slotptr(page, slot)->dead = 0; +} + +BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2) { uint nxt = bt->mgr->page_size; BtPage root = bt->page; uid new_page; // Obtain an empty page to use, and copy the current - // root contents into it which is the lower half of - // the old root. + // root contents into it if( !(new_page = bt_newpage(bt, root)) ) return bt->err; @@ -1828,16 +2137,17 @@ uid new_page; // insert first key on newroot page - nxt -= *newkey + 1; - memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1); + nxt -= *leftkey + 1; + memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1); bt_putid(slotptr(root, 1)->id, new_page); slotptr(root, 1)->off = nxt; - // insert second key on newroot page + // insert second key (stopper key) on newroot page // and increase the root height - nxt -= *oldkey + 1; - memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1); + nxt -= 3; + *((unsigned char *)root + nxt) = 2; + memset ((unsigned char *)root + nxt + 1, 0xff, 2); bt_putid(slotptr(root, 2)->id, page_no2); slotptr(root, 2)->off = nxt; @@ -1856,31 +2166,26 @@ uid new_page; } // split already locked full node -// return unlocked. +// return unlocked and unpinned. -BTERR bt_splitpage (BtDb *bt) +BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no) { -uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size; -unsigned char oldkey[256], lowerkey[256]; -uid page_no = bt->page_no, right; -BtLatchSet *nset, *set = bt->set; -BtPool *pool = bt->pool; -BtPage page = bt->page; +uint slot, cnt, idx, max, nxt = bt->mgr->page_size; +unsigned char rightkey[256], leftkey[256]; +uint tod = time(NULL); uint lvl = page->lvl; uid new_page; BtKey key; -uint tod; - // split higher half of keys to bt->frame - // the last key (fence key) might be dead - - tod = (uint)time(NULL); + // initialize frame buffer for right node memset (bt->frame, 0, bt->mgr->page_size); - max = (int)page->cnt; + max = page->cnt; cnt = max / 2; idx = 0; + // split higher half of keys to bt->frame + while( cnt++ < max ) { key = keyptr(page, cnt); nxt -= key->len + 1; @@ -1892,166 +2197,142 @@ uint tod; slotptr(bt->frame, idx)->off = nxt; } - // remember existing fence key for new page to the right + // transfer right link node to new right node - memcpy (oldkey, key, key->len + 1); + if( page_no > ROOT_page ) + memcpy (bt->frame->right, page->right, BtId); bt->frame->bits = bt->mgr->page_bits; bt->frame->min = nxt; bt->frame->cnt = idx; bt->frame->lvl = lvl; - // link right node - - if( page_no > ROOT_page ) { - right = bt_getid (page->right); - bt_putid(bt->frame->right, right); - } - - // get new free page and write frame to it. + // get new free page and write right frame to it. if( !(new_page = bt_newpage(bt, bt->frame)) ) return bt->err; + // remember fence key for new right page to add + // as right sibling to the left node + + key = keyptr(bt->frame, idx); + memcpy (rightkey, key, key->len + 1); + // update lower keys to continue in old page memcpy (bt->frame, page, bt->mgr->page_size); memset (page+1, 0, bt->mgr->page_size - sizeof(*page)); nxt = bt->mgr->page_size; + page->dirty = 0; page->act = 0; cnt = 0; idx = 0; // assemble page of smaller keys - // (they're all active keys) + // to remain in the old page while( cnt++ < max / 2 ) { key = keyptr(bt->frame, cnt); nxt -= key->len + 1; memcpy ((unsigned char *)page + nxt, key, key->len + 1); - memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); + memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); + if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) ) + page->act++; slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; slotptr(page, idx)->off = nxt; - page->act++; } - // remember fence key for old page + // finalize left page and save fence key - memcpy(lowerkey, key, key->len + 1); - bt_putid(page->right, new_page); + memcpy(leftkey, key, key->len + 1); page->min = nxt; page->cnt = idx; + // link new right page + + bt_putid (page->right, new_page); + // if current page is the root page, split it if( page_no == ROOT_page ) - return bt_splitroot (bt, lowerkey, oldkey, new_page); + return bt_splitroot (bt, leftkey, new_page); - // obtain Parent/Write locks - // for left and right node pages + // obtain ParentModification lock for current page - nset = bt_pinlatch (bt, new_page); - - bt_lockpage (BtLockParent, nset); bt_lockpage (BtLockParent, set); - // release wr lock on left page - // (keep the SMO in sequence) + // release wr lock on our page. + // this will keep out another SMO bt_unlockpage (BtLockWrite, set); - // insert new fence for reformulated left block + // insert key for old page (lower keys) - if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) ) + if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) ) return bt->err; - // fix old fence for newly allocated right block page + // switch old parent key from us to our right page - if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) ) + if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) ) return bt->err; - // release Parent locks + // unlock and unpin - bt_unlockpage (BtLockParent, nset); bt_unlockpage (BtLockParent, set); - bt_unpinlatch (nset); bt_unpinlatch (set); bt_unpinpool (pool); return 0; } -// Insert new key into the btree at requested level. -// Level zero pages are leaf pages. Page is unlocked at exit. +// Insert new key into the btree at given level. -BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod) +BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl) { uint slot, idx; BtPage page; BtKey ptr; - while( 1 ) { - if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) - ptr = keyptr(bt->page, slot); - else - { - if ( !bt->err ) - bt->err = BTERR_ovflw; - return bt->err; - } - - // if key already exists, update id and return - - page = bt->page; - - if( bt->found = !keycmp (ptr, key, len) ) { - slotptr(page, slot)->dead = 0; - slotptr(page, slot)->tod = tod; - bt_putid(slotptr(page,slot)->id, id); - bt_unlockpage(BtLockWrite, bt->set); - bt_unpinlatch(bt->set); - bt_unpinpool (bt->pool); - return bt->err; - } - - // check if page has enough space - - if( slot = bt_cleanpage (bt, len, slot) ) - break; - - if( bt_splitpage (bt) ) - return bt->err; - } - - // calculate next available slot and copy key into page + while( 1 ) { + if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) + ptr = keyptr(bt->page, slot); + else + { + if ( !bt->err ) + bt->err = BTERR_ovflw; + return bt->err; + } - page->min -= len + 1; // reset lowest used offset - ((unsigned char *)page)[page->min] = len; - memcpy ((unsigned char *)page + page->min +1, key, len ); + // if key already exists, update id and return - for( idx = slot; idx < page->cnt; idx++ ) - if( slotptr(page, idx)->dead ) - break; + page = bt->page; - // now insert key into array before slot - // preserving the fence slot + if( !keycmp (ptr, key, len) ) { + if( slotptr(page, slot)->dead ) + page->act++; + slotptr(page, slot)->dead = 0; + slotptr(page, slot)->tod = tod; + bt_putid(slotptr(page,slot)->id, id); + bt_unlockpage(BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + return bt->err; + } - if( idx == page->cnt ) - idx++, page->cnt++; + // check if page has enough space - page->act++; + if( slot = bt_cleanpage (bt, bt->page, len, slot) ) + break; - while( idx > slot ) - *slotptr(page, idx) = *slotptr(page, idx -1), idx--; + if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) ) + return bt->err; + } - bt_putid(slotptr(page,slot)->id, id); - slotptr(page, slot)->off = page->min; - slotptr(page, slot)->tod = tod; - slotptr(page, slot)->dead = 0; + bt_addkeytopage (bt, bt->page, slot, key, len, id, tod); - bt_unlockpage (BtLockWrite, bt->set); - bt_unpinlatch (bt->set); - bt_unpinpool (bt->pool); - return 0; + bt_unlockpage (BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + return 0; } // cache page of keys into cursor and return starting slot for given key @@ -2063,7 +2344,9 @@ uint slot; // cache page for retrieval if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) ) memcpy (bt->cursor, bt->page, bt->mgr->page_size); + bt->cursor_page = bt->page_no; + bt_unlockpage(BtLockRead, bt->set); bt_unpinlatch (bt->set); bt_unpinpool (bt->pool); @@ -2075,6 +2358,7 @@ uint slot; uint bt_nextkey (BtDb *bt, uint slot) { +BtLatchSet *set; BtPool *pool; BtPage page; uid right; @@ -2084,7 +2368,7 @@ uid right; while( slot++ < bt->cursor->cnt ) if( slotptr(bt->cursor,slot)->dead ) continue; - else if( right || (slot < bt->cursor->cnt)) + else if( right || (slot < bt->cursor->cnt) ) return slot; else break; @@ -2093,19 +2377,18 @@ uid right; break; bt->cursor_page = right; - if( pool = bt_pinpool (bt, right) ) page = bt_page (bt, pool, right); else return 0; - bt->set = bt_pinlatch (bt, right); - bt_lockpage(BtLockRead, bt->set); + set = bt_pinlatch (bt, right); + bt_lockpage(BtLockRead, set); memcpy (bt->cursor, page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, bt->set); - bt_unpinlatch (bt->set); + bt_unlockpage(BtLockRead, set); + bt_unpinlatch (set); bt_unpinpool (pool); slot = 0; } while( 1 ); @@ -2128,55 +2411,8 @@ uint bt_tod(BtDb *bt, uint slot) return slotptr(bt->cursor,slot)->tod; } -#ifdef STANDALONE - -void bt_latchaudit (BtDb *bt) -{ -ushort idx, hashidx; -BtLatchSet *set; -BtPool *pool; -BtPage page; -uid page_no; - -#ifdef unix - for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) { - set = bt->mgr->latchsets + idx; - if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) { - fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no); - *(ushort *)set->readwr = 0; - *(ushort *)set->access = 0; - *(ushort *)set->parent = 0; - } - if( set->pin ) { - fprintf(stderr, "latchset %d pinned\n", idx); - set->pin = 0; - } - } - - for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) { - if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch ) - fprintf(stderr, "latchmgr locked\n"); - if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do { - set = bt->mgr->latchsets + idx; - if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) - fprintf(stderr, "latchset %d locked\n", idx); - if( set->hash != hashidx ) - fprintf(stderr, "latchset %d wrong hashidx\n", idx); - if( set->pin ) - fprintf(stderr, "latchset %d pinned\n", idx); - } while( idx = set->next ); - } - page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right); - while( page_no ) { - fprintf(stderr, "free: %.6x\n", (uint)page_no); - pool = bt_pinpool (bt, page_no); - page = bt_page (bt, pool, page_no); - page_no = bt_getid(page->right); - bt_unpinpool (pool); - } -#endif -} +#ifdef STANDALONE typedef struct { char type, idx; @@ -2199,6 +2435,7 @@ uid next, page_no = LEAF_page; // start on first page of leaves unsigned char key[256]; ThreadArg *args = arg; int ch, len = 0, slot; +BtLatchSet *set; time_t tod[1]; BtPool *pool; BtPage page; @@ -2211,12 +2448,6 @@ FILE *in; switch(args->type | 0x20) { - case 'a': - fprintf(stderr, "started latch mgr audit\n"); - bt_latchaudit (bt); - fprintf(stderr, "finished latch mgr audit\n"); - break; - case 'w': fprintf(stderr, "started indexing for %s\n", args->infile); if( in = fopen (args->infile, "rb") ) @@ -2231,7 +2462,7 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_insertkey (bt, key, len, 0, line, *tod) ) + if( bt_insertkey (bt, key, len, line, *tod, 0) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -2253,7 +2484,7 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_deletekey (bt, key, len, 0) ) + if( bt_deletekey (bt, key, len) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -2308,17 +2539,17 @@ FILE *in; fprintf(stderr, "started reading\n"); do { - if( bt->pool = bt_pinpool (bt, page_no) ) - page = bt_page (bt, bt->pool, page_no); + if( pool = bt_pinpool (bt, page_no) ) + page = bt_page (bt, pool, page_no); else break; - bt->set = bt_pinlatch (bt, page_no); - bt_lockpage (BtLockRead, bt->set); + set = bt_pinlatch (bt, page_no); + bt_lockpage (BtLockRead, set); cnt += page->act; next = bt_getid (page->right); - bt_unlockpage (BtLockRead, bt->set); - bt_unpinlatch (bt->set); - bt_unpinpool (bt->pool); + bt_unlockpage (BtLockRead, set); + bt_unpinlatch (set); + bt_unpinpool (pool); } while( page_no = next ); cnt--; // remove stopper key diff --git a/threads2i.c b/threads2i.c index 25feba3..e53e31d 100644 --- a/threads2i.c +++ b/threads2i.c @@ -1,5 +1,5 @@ // btree version threads2i sched_yield version -// 24 JAN 2014 +// 29 JAN 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -162,16 +162,13 @@ typedef struct { // by the BtSlot array of keys. typedef struct Page { - BtLatchSet latch[1]; // Set of three latches uint cnt; // count of keys in page uint act; // count of active keys uint min; // next key offset unsigned char bits; // page size in bits - unsigned char lvl:6; // level of page - unsigned char kill:1; // page is being deleted + unsigned char lvl:7; // level of page unsigned char dirty:1; // page has deleted keys unsigned char right[BtId]; // page number to right - BtSlot table[0]; // array of key slots } *BtPage; // The memory mapping pool table buffer manager entry @@ -185,7 +182,7 @@ typedef struct { void *hashprev; // previous pool entry for the same hash idx void *hashnext; // next pool entry for the same hash idx #ifndef unix - HANDLE hmap; + HANDLE hmap; // Windows memory mapping handle #endif } BtPool; @@ -241,6 +238,7 @@ typedef struct { BtLatchSet *set; // current page latch set BtPool *pool; // current page pool unsigned char *mem; // frame, cursor, page memory buffer + int parent; // last loadpage was from a parent level int found; // last delete or insert was found int err; // last error } BtDb; @@ -258,12 +256,17 @@ typedef enum { // B-Tree functions extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr); -extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod); -extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl); +extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl); +extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len); extern uid bt_findkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_nextkey (BtDb *bt, uint slot); +// internal functions +BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no); +uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot); +BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl); + // manager functions extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize); void bt_mgrclose (BtMgr *mgr); @@ -331,7 +334,7 @@ extern uint bt_tod (BtDb *bt, uint slot); // Access macros to address slot and key values from the page -#define slotptr(page, slot) (page->table + slot-1) +#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1)) #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off)) void bt_putid(unsigned char *dest, uid id) @@ -1414,6 +1417,7 @@ BtLatchSet *set, *prevset; uint drill = 0xff, slot; uint mode, prevmode; BtPool *prevpool; +int parent = 1; // start at root of btree and drill down @@ -1473,30 +1477,33 @@ BtPool *prevpool; // find key on page at this level // and descend to requested level - if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) { + if( slot = bt_findslot (bt, key, len) ) { if( drill == lvl ) - return slot; + return bt->parent = parent, slot; while( slotptr(bt->page, slot)->dead ) if( slot++ < bt->page->cnt ) continue; else { page_no = bt_getid(bt->page->right); + parent = 0; goto slideright; } page_no = bt_getid(slotptr(bt->page, slot)->id); + parent = 1; drill--; } // or slide right into next page - // (slide left from deleted page) - else + else { page_no = bt_getid(bt->page->right); + parent = 0; + } // continue down / right using overlapping locks - // to protect pages being killed or split. + // to protect pages being split. slideright: prevpage = bt->page_no; @@ -1511,119 +1518,384 @@ slideright: return 0; // return error } -// find and delete key on page by marking delete flag bit -// when page becomes empty, delete it +// remove empty page from the B-tree +// by pulling our right node left over ourselves + +// call with bt->page, etc, set to page's locked parent +// returns with page locked. -BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) +BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot) { -unsigned char lowerkey[256], higherkey[256]; -BtLatchSet *rset, *set; -BtPool *pool, *rpool; -uid page_no, right; -uint slot, tod; -BtPage rpage; +BtLatchSet *rset, *pset, *rpset; +BtPool *rpool, *ppool, *rppool; +BtPage rpage, ppage, rppage; +uid right, parent, rparent; BtKey ptr; +uint idx; - if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) - ptr = keyptr(bt->page, slot); + // cache node's parent page + + parent = bt->page_no; + ppage = bt->page; + ppool = bt->pool; + pset = bt->set; + + // lock and map our right page + // it cannot be NULL because of the stopper + // in the last right page + + bt_lockpage (BtLockWrite, set); + + // if we aren't dead yet + + if( page->act ) + goto rmergexit; + + if( right = bt_getid (page->right) ) + if( rpool = bt_pinpool (bt, right) ) + rpage = bt_page (bt, rpool, right); + else + return bt->err; else + return bt->err = BTERR_struct; + + rset = bt_pinlatch (bt, right); + + // find our right neighbor + + if( ppage->act > 1 ) { + for( idx = slot; idx++ < ppage->cnt; ) + if( !slotptr(ppage, idx)->dead ) + break; + + if( idx > ppage->cnt ) + return bt->err = BTERR_struct; + + // redirect right neighbor in parent to left node + + bt_putid(slotptr(ppage,idx)->id, page_no); + } + + // if parent has only our deleted page, e.g. no right neighbor + // prepare to merge parent itself + + if( ppage->act == 1 ) { + if( rparent = bt_getid (ppage->right) ) + if( rppool = bt_pinpool (bt, rparent) ) + rppage = bt_page (bt, rppool, rparent); + else return bt->err; + else + return bt->err = BTERR_struct; - // if key is found delete it, otherwise ignore request + rpset = bt_pinlatch (bt, rparent); + bt_lockpage (BtLockWrite, rpset); - if( bt->found = !keycmp (ptr, key, len) ) - if( bt->found = slotptr(bt->page, slot)->dead == 0 ) { - slotptr(bt->page,slot)->dead = 1; - if( slot < bt->page->cnt ) - bt->page->dirty = 1; - bt->page->act--; + // find our right neighbor on right parent page + + for( idx = 0; idx++ < rppage->cnt; ) + if( !slotptr(rppage, idx)->dead ) { + bt_putid (slotptr(rppage, idx)->id, page_no); + break; } - // return if page is not empty, or it has no right sibling + if( idx > rppage->cnt ) + return bt->err = BTERR_struct; + } - right = bt_getid(bt->page->right); - page_no = bt->page_no; - pool = bt->pool; - set = bt->set; + // now that there are no more pointers to our right node + // we can wait for delete lock on it - if( !right || bt->page->act ) { - bt_unlockpage(BtLockWrite, set); - bt_unpinlatch (set); - bt_unpinpool (pool); + bt_lockpage(BtLockDelete, rset); + bt_lockpage(BtLockWrite, rset); + + // pull contents of right page into our empty page + + memcpy (page, rpage, bt->mgr->page_size); + + // ready to release right parent lock + // now that we have a new page in place + + if( ppage->act == 1 ) { + bt_unlockpage (BtLockWrite, rpset); + bt_unpinlatch (rpset); + bt_unpinpool (rppool); + } + + // add killed right block to free chain + // lock latch mgr + + bt_spinwritelock(bt->mgr->latchmgr->lock); + + // store free chain in allocation page second right + + bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); + bt_putid(bt->mgr->latchmgr->alloc[1].right, right); + + // unlock latch mgr and right page + + bt_unlockpage(BtLockDelete, rset); + bt_unlockpage(BtLockWrite, rset); + bt_unpinlatch (rset); + bt_unpinpool (rpool); + + bt_spinreleasewrite(bt->mgr->latchmgr->lock); + + // delete our obsolete fence key from our parent + + slotptr(ppage, slot)->dead = 1; + ppage->dirty = 1; + + // if our parent now empty + // remove it from the tree + + if( ppage->act-- == 1 ) + if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) ) + return bt->err; + +rmergexit: + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); + + bt->found = 1; + return bt->err = 0; +} + +// remove empty page from the B-tree +// try merging left first. If no left +// sibling, then merge right. + +// call with page loaded and locked, +// return with page locked. + +BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl) +{ +unsigned char fencekey[256], postkey[256]; +uint slot, idx, postfence = 0; +BtLatchSet *lset, *pset; +BtPool *lpool, *ppool; +BtPage lpage, ppage; +uid left, parent; +BtKey ptr; + + ptr = keyptr(page, page->cnt); + memcpy(fencekey, ptr, ptr->len + 1); + bt_unlockpage (BtLockWrite, set); + + // load and lock our parent + +retry: + if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) ) return bt->err; + + parent = bt->page_no; + ppage = bt->page; + ppool = bt->pool; + pset = bt->set; + + // wait until we are posted in our parent + + if( !bt->parent ) { + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); +#ifdef unix + sched_yield(); +#else + SwitchToThread(); +#endif + goto retry; } - // obtain Parent lock over write lock + // find our left neighbor in our parent page - bt_lockpage(BtLockParent, set); + for( idx = slot; --idx; ) + if( !slotptr(ppage, idx)->dead ) + break; - // keep copy of key to delete + // if no left neighbor, do right merge - ptr = keyptr(bt->page, bt->page->cnt); - memcpy(lowerkey, ptr, ptr->len + 1); + if( !idx ) + return bt_mergeright (bt, page, pool, set, page_no, lvl, slot); - // lock and map right page + // lock and map our left neighbor's page - if( rpool = bt_pinpool (bt, right) ) - rpage = bt_page (bt, rpool, right); + left = bt_getid (slotptr(ppage, idx)->id); + + if( lpool = bt_pinpool (bt, left) ) + lpage = bt_page (bt, lpool, left); else return bt->err; - rset = bt_pinlatch (bt, right); - bt_lockpage(BtLockWrite, rset); + lset = bt_pinlatch (bt, left); + bt_lockpage(BtLockWrite, lset); - // pull contents of next page into current empty page + // wait until sibling is in our parent + + if( bt_getid (lpage->right) != page_no ) { + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); + bt_unlockpage (BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); +#ifdef linux + sched_yield(); +#else + SwitchToThread(); +#endif + goto retry; + } - memcpy (bt->page, rpage, bt->mgr->page_size); + // since our page will have no more pointers to it, + // obtain Delete lock and wait for write locks to clear - // keep copy of key to update + bt_lockpage(BtLockDelete, set); + bt_lockpage(BtLockWrite, set); - ptr = keyptr(rpage, rpage->cnt); - memcpy(higherkey, ptr, ptr->len + 1); + // if we aren't dead yet, + // get ready for exit - // Mark right page as deleted and point it to left page - // until we can post updates at higher level. + if( page->act ) { + bt_unlockpage(BtLockDelete, set); + bt_unlockpage(BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); + goto lmergexit; + } - bt_putid(rpage->right, page_no); - rpage->kill = 1; - rpage->cnt = 0; + // are we are the fence key for our parent? + // if so, grab our old fence key - bt_unlockpage(BtLockWrite, rset); - bt_unlockpage(BtLockWrite, set); + if( postfence = slot == ppage->cnt ) { + ptr = keyptr (ppage, ppage->cnt); + memcpy(fencekey, ptr, ptr->len + 1); + memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot)); - // delete old lower key to consolidated node + // clear out other dead slots - if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) ) - return bt->err; + while( --ppage->cnt ) + if( slotptr(ppage, ppage->cnt)->dead ) + memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot)); + else + break; - // redirect higher key directly to consolidated node + ptr = keyptr (ppage, ppage->cnt); + memcpy(postkey, ptr, ptr->len + 1); + } else + slotptr(ppage,slot)->dead = 1; - tod = (uint)time(NULL); + ppage->dirty = 1; + ppage->act--; - if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) ) - return bt->err; + // push our right neighbor pointer to our left - // add killed right block to free chain + memcpy (lpage->right, page->right, BtId); + + // add ourselves to free chain // lock latch mgr bt_spinwritelock(bt->mgr->latchmgr->lock); // store free chain in allocation page second right - bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); - bt_putid(bt->mgr->latchmgr->alloc[1].right, right); + bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); + bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no); - // unlock latch mgr and unpin right page + // unlock latch mgr and pages bt_spinreleasewrite(bt->mgr->latchmgr->lock); - bt_unpinlatch (rset); - bt_unpinpool (rpool); + bt_unlockpage(BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); + + // release our node's delete lock + + bt_unlockpage(BtLockDelete, set); - // remove ParentModify lock +lmergexit: + bt_unlockpage (BtLockWrite, pset); + bt_unpinpool (ppool); + + // do we need to post parent's fence key in its parent? + + if( !postfence || parent == ROOT_page ) { + bt_unpinlatch (pset); + bt->found = 1; + return bt->err = 0; + } + + // interlock parent fence post + + bt_lockpage (BtLockParent, pset); + + // load parent's parent page +posttry: + if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) ) + return bt->err; - bt_unlockpage(BtLockParent, set); + if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) ) + if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) ) + return bt->err; + else + goto posttry; + + page = bt->page; + + page->min -= *postkey + 1; + ((unsigned char *)page)[page->min] = *postkey; + memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey ); + slotptr(page, slot)->off = page->min; + + bt_unlockpage (BtLockParent, pset); + bt_unpinlatch (pset); + + bt_unlockpage (BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + + bt->found = 1; + return bt->err = 0; +} + +// find and delete key on page by marking delete flag bit +// if page becomes empty, delete it from the btree + +BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len) +{ +BtLatchSet *set; +BtPool *pool; +BtPage page; +uid page_no; +BtKey ptr; +uint slot; + + if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) ) + return bt->err; + + page_no = bt->page_no; + page = bt->page; + pool = bt->pool; + set = bt->set; + + // if key is found delete it, otherwise ignore request + + ptr = keyptr(page, slot); + + if( bt->found = !keycmp (ptr, key, len) ) + if( bt->found = slotptr(page, slot)->dead == 0 ) { + slotptr(page,slot)->dead = 1; + if( slot < page->cnt ) + page->dirty = 1; + if( !--page->act ) + if( bt_mergeleft (bt, page, pool, set, page_no, 0) ) + return bt->err; + } + + bt_unlockpage(BtLockWrite, set); bt_unpinlatch (set); bt_unpinpool (pool); - return 0; + return bt->err = 0; } // find key in leaf level and return row-id @@ -1642,7 +1914,7 @@ uid id; // if key exists, return row-id // otherwise return 0 - if( ptr->len == len && !memcmp (ptr->key, key, len) ) + if( slot <= bt->page->cnt && !keycmp (ptr, key, len) ) id = bt_getid(slotptr(bt->page,slot)->id); else id = 0; @@ -1655,16 +1927,15 @@ uid id; // check page for space available, // clean if necessary and return -// =0 - page needs splitting -// >0 - go ahead at returned slot +// 0 - page needs splitting +// >0 new slot value -uint bt_cleanpage(BtDb *bt, uint amt, uint slot) +uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot) { uint nxt = bt->mgr->page_size; -BtPage page = bt->page; uint cnt = 0, idx = 0; uint max = page->cnt; -uint newslot; +uint newslot = max; BtKey key; if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) @@ -1683,15 +1954,19 @@ BtKey key; page->dirty = 0; page->act = 0; - // always leave fence key in list + // try cleaning up page first + + // always leave fence key in the array + // otherwise, remove deleted key while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 1; - else if( cnt < max && slotptr(bt->frame,cnt)->dead ) + if( cnt < max && slotptr(bt->frame,cnt)->dead ) continue; // copy key + key = keyptr(bt->frame, cnt); nxt -= key->len + 1; memcpy ((unsigned char *)page + nxt, key, key->len + 1); @@ -1703,26 +1978,59 @@ BtKey key; slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; slotptr(page, idx)->off = nxt; } + page->min = nxt; page->cnt = idx; + // see if page has enough space now, or does it need splitting? + if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) return newslot; return 0; } -// split the root and raise the height of the btree +// add key to current page +// page must already be writelocked + +void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod) +{ +uint idx; + + // find next available dead slot and copy key onto page + + for( idx = slot; idx < page->cnt; idx++ ) + if( slotptr(page, idx)->dead ) + break; + + if( idx == page->cnt ) + idx++, page->cnt++; + + page->act++; + + // now insert key into array before slot -BTERR bt_splitroot(BtDb *bt, unsigned char *newkey, unsigned char *oldkey, uid page_no2) + while( idx > slot ) + *slotptr(page, idx) = *slotptr(page, idx -1), idx--; + + page->min -= len + 1; + ((unsigned char *)page)[page->min] = len; + memcpy ((unsigned char *)page + page->min +1, key, len ); + + bt_putid(slotptr(page,slot)->id, id); + slotptr(page, slot)->off = page->min; + slotptr(page, slot)->tod = tod; + slotptr(page, slot)->dead = 0; +} + +BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2) { uint nxt = bt->mgr->page_size; BtPage root = bt->page; uid new_page; // Obtain an empty page to use, and copy the current - // root contents into it which is the lower half of - // the old root. + // root contents into it if( !(new_page = bt_newpage(bt, root)) ) return bt->err; @@ -1734,16 +2042,17 @@ uid new_page; // insert first key on newroot page - nxt -= *newkey + 1; - memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1); + nxt -= *leftkey + 1; + memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1); bt_putid(slotptr(root, 1)->id, new_page); slotptr(root, 1)->off = nxt; - // insert second key on newroot page + // insert second key (stopper key) on newroot page // and increase the root height - nxt -= *oldkey + 1; - memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1); + nxt -= 3; + *((unsigned char *)root + nxt) = 2; + memset ((unsigned char *)root + nxt + 1, 0xff, 2); bt_putid(slotptr(root, 2)->id, page_no2); slotptr(root, 2)->off = nxt; @@ -1762,31 +2071,26 @@ uid new_page; } // split already locked full node -// return unlocked. +// return unlocked and unpinned. -BTERR bt_splitpage (BtDb *bt) +BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no) { -uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size; -unsigned char oldkey[256], lowerkey[256]; -uid page_no = bt->page_no, right; -BtLatchSet *nset, *set = bt->set; -BtPool *pool = bt->pool; -BtPage page = bt->page; +uint slot, cnt, idx, max, nxt = bt->mgr->page_size; +unsigned char rightkey[256], leftkey[256]; +uint tod = time(NULL); uint lvl = page->lvl; uid new_page; BtKey key; -uint tod; - // split higher half of keys to bt->frame - // the last key (fence key) might be dead - - tod = (uint)time(NULL); + // initialize frame buffer for right node memset (bt->frame, 0, bt->mgr->page_size); - max = (int)page->cnt; + max = page->cnt; cnt = max / 2; idx = 0; + // split higher half of keys to bt->frame + while( cnt++ < max ) { key = keyptr(page, cnt); nxt -= key->len + 1; @@ -1798,166 +2102,142 @@ uint tod; slotptr(bt->frame, idx)->off = nxt; } - // remember existing fence key for new page to the right + // transfer right link node to new right node - memcpy (oldkey, key, key->len + 1); + if( page_no > ROOT_page ) + memcpy (bt->frame->right, page->right, BtId); bt->frame->bits = bt->mgr->page_bits; bt->frame->min = nxt; bt->frame->cnt = idx; bt->frame->lvl = lvl; - // link right node - - if( page_no > ROOT_page ) { - right = bt_getid (page->right); - bt_putid(bt->frame->right, right); - } - - // get new free page and write frame to it. + // get new free page and write right frame to it. if( !(new_page = bt_newpage(bt, bt->frame)) ) return bt->err; + // remember fence key for new right page to add + // as right sibling to the left node + + key = keyptr(bt->frame, idx); + memcpy (rightkey, key, key->len + 1); + // update lower keys to continue in old page memcpy (bt->frame, page, bt->mgr->page_size); memset (page+1, 0, bt->mgr->page_size - sizeof(*page)); nxt = bt->mgr->page_size; + page->dirty = 0; page->act = 0; cnt = 0; idx = 0; // assemble page of smaller keys - // (they're all active keys) + // to remain in the old page while( cnt++ < max / 2 ) { key = keyptr(bt->frame, cnt); nxt -= key->len + 1; memcpy ((unsigned char *)page + nxt, key, key->len + 1); - memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); + memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); + if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) ) + page->act++; slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; slotptr(page, idx)->off = nxt; - page->act++; } - // remember fence key for old page + // finalize left page and save fence key - memcpy(lowerkey, key, key->len + 1); - bt_putid(page->right, new_page); + memcpy(leftkey, key, key->len + 1); page->min = nxt; page->cnt = idx; + // link new right page + + bt_putid (page->right, new_page); + // if current page is the root page, split it if( page_no == ROOT_page ) - return bt_splitroot (bt, lowerkey, oldkey, new_page); - - // obtain Parent/Write locks - // for left and right node pages + return bt_splitroot (bt, leftkey, new_page); - nset = bt_pinlatch (bt, new_page); + // obtain ParentModification lock for current page - bt_lockpage (BtLockParent, nset); bt_lockpage (BtLockParent, set); - // release wr lock on left page - // (keep the SMO in sequence) + // release wr lock on our page. + // this will keep out another SMO bt_unlockpage (BtLockWrite, set); - // insert new fence for reformulated left block + // insert key for old page (lower keys) - if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) ) + if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) ) return bt->err; - // fix old fence for newly allocated right block page + // switch old parent key from us to our right page - if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) ) + if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) ) return bt->err; - // release Parent locks + // unlock and unpin - bt_unlockpage (BtLockParent, nset); bt_unlockpage (BtLockParent, set); - bt_unpinlatch (nset); bt_unpinlatch (set); bt_unpinpool (pool); return 0; } -// Insert new key into the btree at requested level. -// Level zero pages are leaf pages. Page is unlocked at exit. +// Insert new key into the btree at given level. -BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod) +BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl) { uint slot, idx; BtPage page; BtKey ptr; - while( 1 ) { - if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) - ptr = keyptr(bt->page, slot); - else - { - if ( !bt->err ) - bt->err = BTERR_ovflw; - return bt->err; - } - - // if key already exists, update id and return - - page = bt->page; - - if( bt->found = !keycmp (ptr, key, len) ) { - slotptr(page, slot)->dead = 0; - slotptr(page, slot)->tod = tod; - bt_putid(slotptr(page,slot)->id, id); - bt_unlockpage(BtLockWrite, bt->set); - bt_unpinlatch(bt->set); - bt_unpinpool (bt->pool); - return bt->err; - } - - // check if page has enough space - - if( slot = bt_cleanpage (bt, len, slot) ) - break; - - if( bt_splitpage (bt) ) - return bt->err; - } - - // calculate next available slot and copy key into page + while( 1 ) { + if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) + ptr = keyptr(bt->page, slot); + else + { + if ( !bt->err ) + bt->err = BTERR_ovflw; + return bt->err; + } - page->min -= len + 1; // reset lowest used offset - ((unsigned char *)page)[page->min] = len; - memcpy ((unsigned char *)page + page->min +1, key, len ); + // if key already exists, update id and return - for( idx = slot; idx < page->cnt; idx++ ) - if( slotptr(page, idx)->dead ) - break; + page = bt->page; - // now insert key into array before slot - // preserving the fence slot + if( !keycmp (ptr, key, len) ) { + if( slotptr(page, slot)->dead ) + page->act++; + slotptr(page, slot)->dead = 0; + slotptr(page, slot)->tod = tod; + bt_putid(slotptr(page,slot)->id, id); + bt_unlockpage(BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + return bt->err; + } - if( idx == page->cnt ) - idx++, page->cnt++; + // check if page has enough space - page->act++; + if( slot = bt_cleanpage (bt, bt->page, len, slot) ) + break; - while( idx > slot ) - *slotptr(page, idx) = *slotptr(page, idx -1), idx--; + if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) ) + return bt->err; + } - bt_putid(slotptr(page,slot)->id, id); - slotptr(page, slot)->off = page->min; - slotptr(page, slot)->tod = tod; - slotptr(page, slot)->dead = 0; + bt_addkeytopage (bt, bt->page, slot, key, len, id, tod); - bt_unlockpage (BtLockWrite, bt->set); - bt_unpinlatch (bt->set); - bt_unpinpool (bt->pool); - return 0; + bt_unlockpage (BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + return 0; } // cache page of keys into cursor and return starting slot for given key @@ -1969,7 +2249,9 @@ uint slot; // cache page for retrieval if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) ) memcpy (bt->cursor, bt->page, bt->mgr->page_size); + bt->cursor_page = bt->page_no; + bt_unlockpage(BtLockRead, bt->set); bt_unpinlatch (bt->set); bt_unpinpool (bt->pool); @@ -1981,6 +2263,7 @@ uint slot; uint bt_nextkey (BtDb *bt, uint slot) { +BtLatchSet *set; BtPool *pool; BtPage page; uid right; @@ -1990,7 +2273,7 @@ uid right; while( slot++ < bt->cursor->cnt ) if( slotptr(bt->cursor,slot)->dead ) continue; - else if( right || (slot < bt->cursor->cnt)) + else if( right || (slot < bt->cursor->cnt) ) return slot; else break; @@ -1999,19 +2282,18 @@ uid right; break; bt->cursor_page = right; - if( pool = bt_pinpool (bt, right) ) page = bt_page (bt, pool, right); else return 0; - bt->set = bt_pinlatch (bt, right); - bt_lockpage(BtLockRead, bt->set); + set = bt_pinlatch (bt, right); + bt_lockpage(BtLockRead, set); memcpy (bt->cursor, page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, bt->set); - bt_unpinlatch (bt->set); + bt_unlockpage(BtLockRead, set); + bt_unpinlatch (set); bt_unpinpool (pool); slot = 0; } while( 1 ); @@ -2034,6 +2316,7 @@ uint bt_tod(BtDb *bt, uint slot) return slotptr(bt->cursor,slot)->tod; } + #ifdef STANDALONE void bt_latchaudit (BtDb *bt) @@ -2105,6 +2388,7 @@ uid next, page_no = LEAF_page; // start on first page of leaves unsigned char key[256]; ThreadArg *args = arg; int ch, len = 0, slot; +BtLatchSet *set; time_t tod[1]; BtPool *pool; BtPage page; @@ -2137,7 +2421,7 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_insertkey (bt, key, len, 0, line, *tod) ) + if( bt_insertkey (bt, key, len, line, *tod, 0) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -2159,7 +2443,7 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_deletekey (bt, key, len, 0) ) + if( bt_deletekey (bt, key, len) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -2214,17 +2498,17 @@ FILE *in; fprintf(stderr, "started reading\n"); do { - if( bt->pool = bt_pinpool (bt, page_no) ) - page = bt_page (bt, bt->pool, page_no); + if( pool = bt_pinpool (bt, page_no) ) + page = bt_page (bt, pool, page_no); else break; - bt->set = bt_pinlatch (bt, page_no); - bt_lockpage (BtLockRead, bt->set); + set = bt_pinlatch (bt, page_no); + bt_lockpage (BtLockRead, set); cnt += page->act; next = bt_getid (page->right); - bt_unlockpage (BtLockRead, bt->set); - bt_unpinlatch (bt->set); - bt_unpinpool (bt->pool); + bt_unlockpage (BtLockRead, set); + bt_unpinlatch (set); + bt_unpinpool (pool); } while( page_no = next ); cnt--; // remove stopper key diff --git a/threads2j.c b/threads2j.c index e5baf7b..54e14bc 100644 --- a/threads2j.c +++ b/threads2j.c @@ -1,5 +1,5 @@ // btree version threads2j linux futex concurrency version -// 24 JAN 2014 +// 29 JAN 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -154,11 +154,9 @@ typedef struct Page { uint act; // count of active keys uint min; // next key offset unsigned char bits; // page size in bits - unsigned char lvl:6; // level of page - unsigned char kill:1; // page is being deleted + unsigned char lvl:7; // level of page unsigned char dirty:1; // page has deleted keys unsigned char right[BtId]; // page number to right - BtSlot table[0]; // array of key slots } *BtPage; // hash table entries @@ -249,6 +247,7 @@ typedef struct { BtLatchSet *set; // current page latchset BtPool *pool; // current page pool unsigned char *mem; // frame, cursor, page memory buffer + int parent; // last loadpage was from a parent level int found; // last delete or insert was found int err; // last error } BtDb; @@ -266,12 +265,17 @@ typedef enum { // B-Tree functions extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr); -extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod); -extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl); +extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl); +extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len); extern uid bt_findkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len); extern uint bt_nextkey (BtDb *bt, uint slot); +// internal functions +BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no); +uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot); +BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl); + // manager functions extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize); void bt_mgrclose (BtMgr *mgr); @@ -339,7 +343,7 @@ extern uint bt_tod (BtDb *bt, uint slot); // Access macros to address slot and key values from the page -#define slotptr(page, slot) (page->table + slot-1) +#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1)) #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off)) void bt_putid(unsigned char *dest, uid id) @@ -1448,6 +1452,7 @@ BtLatchSet *set, *prevset; uint drill = 0xff, slot; uint mode, prevmode; BtPool *prevpool; +int parent = 1; // start at root of btree and drill down @@ -1507,30 +1512,33 @@ BtPool *prevpool; // find key on page at this level // and descend to requested level - if( !bt->page->kill && (slot = bt_findslot (bt, key, len)) ) { + if( slot = bt_findslot (bt, key, len) ) { if( drill == lvl ) - return slot; + return bt->parent = parent, slot; while( slotptr(bt->page, slot)->dead ) if( slot++ < bt->page->cnt ) continue; else { page_no = bt_getid(bt->page->right); + parent = 0; goto slideright; } page_no = bt_getid(slotptr(bt->page, slot)->id); + parent = 1; drill--; } // or slide right into next page - // (slide left from deleted page) - else + else { page_no = bt_getid(bt->page->right); + parent = 0; + } // continue down / right using overlapping locks - // to protect pages being killed or split. + // to protect pages being split. slideright: prevpage = bt->page_no; @@ -1545,119 +1553,384 @@ slideright: return 0; // return error } -// find and delete key on page by marking delete flag bit -// when page becomes empty, delete it +// remove empty page from the B-tree +// by pulling our right node left over ourselves + +// call with bt->page, etc, set to page's locked parent +// returns with page locked. -BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) +BTERR bt_mergeright (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl, uint slot) { -unsigned char lowerkey[256], higherkey[256]; -BtLatchSet *rset, *set; -BtPool *pool, *rpool; -uid page_no, right; -uint slot, tod; -BtPage rpage; +BtLatchSet *rset, *pset, *rpset; +BtPool *rpool, *ppool, *rppool; +BtPage rpage, ppage, rppage; +uid right, parent, rparent; BtKey ptr; +uint idx; - if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) - ptr = keyptr(bt->page, slot); + // cache node's parent page + + parent = bt->page_no; + ppage = bt->page; + ppool = bt->pool; + pset = bt->set; + + // lock and map our right page + // it cannot be NULL because of the stopper + // in the last right page + + bt_lockpage (BtLockWrite, set); + + // if we aren't dead yet + + if( page->act ) + goto rmergexit; + + if( right = bt_getid (page->right) ) + if( rpool = bt_pinpool (bt, right) ) + rpage = bt_page (bt, rpool, right); + else + return bt->err; else + return bt->err = BTERR_struct; + + rset = bt_pinlatch (bt, right); + + // find our right neighbor + + if( ppage->act > 1 ) { + for( idx = slot; idx++ < ppage->cnt; ) + if( !slotptr(ppage, idx)->dead ) + break; + + if( idx > ppage->cnt ) + return bt->err = BTERR_struct; + + // redirect right neighbor in parent to left node + + bt_putid(slotptr(ppage,idx)->id, page_no); + } + + // if parent has only our deleted page, e.g. no right neighbor + // prepare to merge parent itself + + if( ppage->act == 1 ) { + if( rparent = bt_getid (ppage->right) ) + if( rppool = bt_pinpool (bt, rparent) ) + rppage = bt_page (bt, rppool, rparent); + else return bt->err; + else + return bt->err = BTERR_struct; - // if key is found delete it, otherwise ignore request + rpset = bt_pinlatch (bt, rparent); + bt_lockpage (BtLockWrite, rpset); - if( bt->found = !keycmp (ptr, key, len) ) - if( bt->found = slotptr(bt->page, slot)->dead == 0 ) { - slotptr(bt->page,slot)->dead = 1; - if( slot < bt->page->cnt ) - bt->page->dirty = 1; - bt->page->act--; + // find our right neighbor on right parent page + + for( idx = 0; idx++ < rppage->cnt; ) + if( !slotptr(rppage, idx)->dead ) { + bt_putid (slotptr(rppage, idx)->id, page_no); + break; } - // return if page is not empty, or it has no right sibling + if( idx > rppage->cnt ) + return bt->err = BTERR_struct; + } - right = bt_getid(bt->page->right); - page_no = bt->page_no; - pool = bt->pool; - set = bt->set; + // now that there are no more pointers to our right node + // we can wait for delete lock on it - if( !right || bt->page->act ) { - bt_unlockpage(BtLockWrite, set); - bt_unpinlatch (set); - bt_unpinpool (pool); + bt_lockpage(BtLockDelete, rset); + bt_lockpage(BtLockWrite, rset); + + // pull contents of right page into our empty page + + memcpy (page, rpage, bt->mgr->page_size); + + // ready to release right parent lock + // now that we have a new page in place + + if( ppage->act == 1 ) { + bt_unlockpage (BtLockWrite, rpset); + bt_unpinlatch (rpset); + bt_unpinpool (rppool); + } + + // add killed right block to free chain + // lock latch mgr + + bt_spinwritelock(bt->mgr->latchmgr->lock, 0); + + // store free chain in allocation page second right + + bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); + bt_putid(bt->mgr->latchmgr->alloc[1].right, right); + + // unlock latch mgr and right page + + bt_unlockpage(BtLockDelete, rset); + bt_unlockpage(BtLockWrite, rset); + bt_unpinlatch (rset); + bt_unpinpool (rpool); + + bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0); + + // delete our obsolete fence key from our parent + + slotptr(ppage, slot)->dead = 1; + ppage->dirty = 1; + + // if our parent now empty + // remove it from the tree + + if( ppage->act-- == 1 ) + if( bt_mergeleft (bt, ppage, ppool, pset, parent, lvl+1) ) return bt->err; + +rmergexit: + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); + + bt->found = 1; + return bt->err = 0; +} + +// remove empty page from the B-tree +// try merging left first. If no left +// sibling, then merge right. + +// call with page loaded and locked, +// return with page locked. + +BTERR bt_mergeleft (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no, uint lvl) +{ +unsigned char fencekey[256], postkey[256]; +uint slot, idx, postfence = 0; +BtLatchSet *lset, *pset; +BtPool *lpool, *ppool; +BtPage lpage, ppage; +uid left, parent; +BtKey ptr; + + ptr = keyptr(page, page->cnt); + memcpy(fencekey, ptr, ptr->len + 1); + bt_unlockpage (BtLockWrite, set); + + // load and lock our parent + +retry: + if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+1, BtLockWrite)) ) + return bt->err; + + parent = bt->page_no; + ppage = bt->page; + ppool = bt->pool; + pset = bt->set; + + // wait until we are posted in our parent + + if( !bt->parent ) { + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); +#ifdef unix + sched_yield(); +#else + SwitchToThread(); +#endif + goto retry; } - // obtain Parent lock over write lock + // find our left neighbor in our parent page - bt_lockpage(BtLockParent, set); + for( idx = slot; --idx; ) + if( !slotptr(ppage, idx)->dead ) + break; - // keep copy of key to delete + // if no left neighbor, do right merge - ptr = keyptr(bt->page, bt->page->cnt); - memcpy(lowerkey, ptr, ptr->len + 1); + if( !idx ) + return bt_mergeright (bt, page, pool, set, page_no, lvl, slot); - // lock and map right page + // lock and map our left neighbor's page - if( rpool = bt_pinpool (bt, right) ) - rpage = bt_page (bt, rpool, right); + left = bt_getid (slotptr(ppage, idx)->id); + + if( lpool = bt_pinpool (bt, left) ) + lpage = bt_page (bt, lpool, left); else return bt->err; - rset = bt_pinlatch (bt, right); - bt_lockpage(BtLockWrite, rset); + lset = bt_pinlatch (bt, left); + bt_lockpage(BtLockWrite, lset); - // pull contents of next page into current empty page + // wait until sibling is in our parent - memcpy (bt->page, rpage, bt->mgr->page_size); + if( bt_getid (lpage->right) != page_no ) { + bt_unlockpage (BtLockWrite, pset); + bt_unpinlatch (pset); + bt_unpinpool (ppool); + bt_unlockpage (BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); +#ifdef linux + sched_yield(); +#else + SwitchToThread(); +#endif + goto retry; + } - // keep copy of key to update + // since our page will have no more pointers to it, + // obtain Delete lock and wait for write locks to clear - ptr = keyptr(rpage, rpage->cnt); - memcpy(higherkey, ptr, ptr->len + 1); + bt_lockpage(BtLockDelete, set); + bt_lockpage(BtLockWrite, set); - // Mark right page as deleted and point it to left page - // until we can post updates at higher level. + // if we aren't dead yet, + // get ready for exit - bt_putid(rpage->right, page_no); - rpage->kill = 1; - rpage->cnt = 0; + if( page->act ) { + bt_unlockpage(BtLockDelete, set); + bt_unlockpage(BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); + goto lmergexit; + } - bt_unlockpage(BtLockWrite, rset); - bt_unlockpage(BtLockWrite, set); + // are we are the fence key for our parent? + // if so, grab our old fence key - // delete old lower key to consolidated node + if( postfence = slot == ppage->cnt ) { + ptr = keyptr (ppage, ppage->cnt); + memcpy(fencekey, ptr, ptr->len + 1); + memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot)); - if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) ) - return bt->err; + // clear out other dead slots - // redirect higher key directly to consolidated node + while( --ppage->cnt ) + if( slotptr(ppage, ppage->cnt)->dead ) + memset(slotptr(ppage, ppage->cnt), 0, sizeof(BtSlot)); + else + break; - tod = (uint)time(NULL); + ptr = keyptr (ppage, ppage->cnt); + memcpy(postkey, ptr, ptr->len + 1); + } else + slotptr(ppage,slot)->dead = 1; - if( bt_insertkey (bt, higherkey+1, *higherkey, lvl + 1, page_no, tod) ) - return bt->err; + ppage->dirty = 1; + ppage->act--; - // add killed right block to free chain + // push our right neighbor pointer to our left + + memcpy (lpage->right, page->right, BtId); + + // add ourselves to free chain // lock latch mgr bt_spinwritelock(bt->mgr->latchmgr->lock, 0); // store free chain in allocation page second right - bt_putid(rpage->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); - bt_putid(bt->mgr->latchmgr->alloc[1].right, right); + bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right)); + bt_putid(bt->mgr->latchmgr->alloc[1].right, page_no); - // unlock latch mgr and unpin right page + // unlock latch mgr and pages bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0); - bt_unpinlatch (rset); - bt_unpinpool (rpool); + bt_unlockpage(BtLockWrite, lset); + bt_unpinlatch (lset); + bt_unpinpool (lpool); + + // release our node's delete lock + + bt_unlockpage(BtLockDelete, set); + +lmergexit: + bt_unlockpage (BtLockWrite, pset); + bt_unpinpool (ppool); + + // do we need to post parent's fence key in its parent? + + if( !postfence || parent == ROOT_page ) { + bt_unpinlatch (pset); + bt->found = 1; + return bt->err = 0; + } + + // interlock parent fence post - // remove ParentModify lock + bt_lockpage (BtLockParent, pset); - bt_unlockpage(BtLockParent, set); + // load parent's parent page +posttry: + if( !(slot = bt_loadpage (bt, fencekey+1, *fencekey, lvl+2, BtLockWrite)) ) + return bt->err; + + if( !(slot = bt_cleanpage (bt, bt->page, *fencekey, slot)) ) + if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) ) + return bt->err; + else + goto posttry; + + page = bt->page; + + page->min -= *postkey + 1; + ((unsigned char *)page)[page->min] = *postkey; + memcpy ((unsigned char *)page + page->min +1, postkey + 1, *postkey ); + slotptr(page, slot)->off = page->min; + + bt_unlockpage (BtLockParent, pset); + bt_unpinlatch (pset); + + bt_unlockpage (BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + + bt->found = 1; + return bt->err = 0; +} + +// find and delete key on page by marking delete flag bit +// if page becomes empty, delete it from the btree + +BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len) +{ +BtLatchSet *set; +BtPool *pool; +BtPage page; +uid page_no; +BtKey ptr; +uint slot; + + if( !(slot = bt_loadpage (bt, key, len, 0, BtLockWrite)) ) + return bt->err; + + page_no = bt->page_no; + page = bt->page; + pool = bt->pool; + set = bt->set; + + // if key is found delete it, otherwise ignore request + + ptr = keyptr(page, slot); + + if( bt->found = !keycmp (ptr, key, len) ) + if( bt->found = slotptr(page, slot)->dead == 0 ) { + slotptr(page,slot)->dead = 1; + if( slot < page->cnt ) + page->dirty = 1; + if( !--page->act ) + if( bt_mergeleft (bt, page, pool, set, page_no, 0) ) + return bt->err; + } + + bt_unlockpage(BtLockWrite, set); bt_unpinlatch (set); bt_unpinpool (pool); - return 0; + return bt->err = 0; } // find key in leaf level and return row-id @@ -1676,7 +1949,7 @@ uid id; // if key exists, return row-id // otherwise return 0 - if( ptr->len == len && !memcmp (ptr->key, key, len) ) + if( slot <= bt->page->cnt && !keycmp (ptr, key, len) ) id = bt_getid(slotptr(bt->page,slot)->id); else id = 0; @@ -1689,16 +1962,15 @@ uid id; // check page for space available, // clean if necessary and return -// =0 - page needs splitting -// >0 - go ahead at returned slot +// 0 - page needs splitting +// >0 new slot value -uint bt_cleanpage(BtDb *bt, uint amt, uint slot) +uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot) { uint nxt = bt->mgr->page_size; -BtPage page = bt->page; uint cnt = 0, idx = 0; uint max = page->cnt; -uint newslot; +uint newslot = max; BtKey key; if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) @@ -1717,15 +1989,19 @@ BtKey key; page->dirty = 0; page->act = 0; - // always leave fence key in list + // try cleaning up page first + + // always leave fence key in the array + // otherwise, remove deleted key while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 1; - else if( cnt < max && slotptr(bt->frame,cnt)->dead ) + if( cnt < max && slotptr(bt->frame,cnt)->dead ) continue; // copy key + key = keyptr(bt->frame, cnt); nxt -= key->len + 1; memcpy ((unsigned char *)page + nxt, key, key->len + 1); @@ -1737,26 +2013,59 @@ BtKey key; slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; slotptr(page, idx)->off = nxt; } + page->min = nxt; page->cnt = idx; + // see if page has enough space now, or does it need splitting? + if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 ) return newslot; return 0; } -// split the root and raise the height of the btree +// add key to current page +// page must already be writelocked -BTERR bt_splitroot(BtDb *bt, unsigned char *newkey, unsigned char *oldkey, uid page_no2) +void bt_addkeytopage (BtDb *bt, BtPage page, uint slot, unsigned char *key, uint len, uid id, uint tod) +{ +uint idx; + + // find next available dead slot and copy key onto page + + for( idx = slot; idx < page->cnt; idx++ ) + if( slotptr(page, idx)->dead ) + break; + + if( idx == page->cnt ) + idx++, page->cnt++; + + page->act++; + + // now insert key into array before slot + + while( idx > slot ) + *slotptr(page, idx) = *slotptr(page, idx -1), idx--; + + page->min -= len + 1; + ((unsigned char *)page)[page->min] = len; + memcpy ((unsigned char *)page + page->min +1, key, len ); + + bt_putid(slotptr(page,slot)->id, id); + slotptr(page, slot)->off = page->min; + slotptr(page, slot)->tod = tod; + slotptr(page, slot)->dead = 0; +} + +BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2) { uint nxt = bt->mgr->page_size; BtPage root = bt->page; uid new_page; // Obtain an empty page to use, and copy the current - // root contents into it which is the lower half of - // the old root. + // root contents into it if( !(new_page = bt_newpage(bt, root)) ) return bt->err; @@ -1768,16 +2077,17 @@ uid new_page; // insert first key on newroot page - nxt -= *newkey + 1; - memcpy ((unsigned char *)root + nxt, newkey, *newkey + 1); + nxt -= *leftkey + 1; + memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1); bt_putid(slotptr(root, 1)->id, new_page); slotptr(root, 1)->off = nxt; - // insert second key on newroot page + // insert second key (stopper key) on newroot page // and increase the root height - nxt -= *oldkey + 1; - memcpy ((unsigned char *)root + nxt, oldkey, *oldkey + 1); + nxt -= 3; + *((unsigned char *)root + nxt) = 2; + memset ((unsigned char *)root + nxt + 1, 0xff, 2); bt_putid(slotptr(root, 2)->id, page_no2); slotptr(root, 2)->off = nxt; @@ -1796,31 +2106,26 @@ uid new_page; } // split already locked full node -// return unlocked. +// return unlocked and unpinned. -BTERR bt_splitpage (BtDb *bt) +BTERR bt_splitpage (BtDb *bt, BtPage page, BtPool *pool, BtLatchSet *set, uid page_no) { -uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size; -unsigned char oldkey[256], lowerkey[256]; -uid page_no = bt->page_no, right; -BtLatchSet *nset, *set = bt->set; -BtPool *pool = bt->pool; -BtPage page = bt->page; +uint slot, cnt, idx, max, nxt = bt->mgr->page_size; +unsigned char rightkey[256], leftkey[256]; +uint tod = time(NULL); uint lvl = page->lvl; uid new_page; BtKey key; -uint tod; - - // split higher half of keys to bt->frame - // the last key (fence key) might be dead - tod = (uint)time(NULL); + // initialize frame buffer for right node memset (bt->frame, 0, bt->mgr->page_size); - max = (int)page->cnt; + max = page->cnt; cnt = max / 2; idx = 0; + // split higher half of keys to bt->frame + while( cnt++ < max ) { key = keyptr(page, cnt); nxt -= key->len + 1; @@ -1832,166 +2137,142 @@ uint tod; slotptr(bt->frame, idx)->off = nxt; } - // remember existing fence key for new page to the right + // transfer right link node to new right node - memcpy (oldkey, key, key->len + 1); + if( page_no > ROOT_page ) + memcpy (bt->frame->right, page->right, BtId); bt->frame->bits = bt->mgr->page_bits; bt->frame->min = nxt; bt->frame->cnt = idx; bt->frame->lvl = lvl; - // link right node - - if( page_no > ROOT_page ) { - right = bt_getid (page->right); - bt_putid(bt->frame->right, right); - } - - // get new free page and write frame to it. + // get new free page and write right frame to it. if( !(new_page = bt_newpage(bt, bt->frame)) ) return bt->err; + // remember fence key for new right page to add + // as right sibling to the left node + + key = keyptr(bt->frame, idx); + memcpy (rightkey, key, key->len + 1); + // update lower keys to continue in old page memcpy (bt->frame, page, bt->mgr->page_size); memset (page+1, 0, bt->mgr->page_size - sizeof(*page)); nxt = bt->mgr->page_size; + page->dirty = 0; page->act = 0; cnt = 0; idx = 0; // assemble page of smaller keys - // (they're all active keys) + // to remain in the old page while( cnt++ < max / 2 ) { key = keyptr(bt->frame, cnt); nxt -= key->len + 1; memcpy ((unsigned char *)page + nxt, key, key->len + 1); - memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); + memcpy (slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId); + if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) ) + page->act++; slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod; slotptr(page, idx)->off = nxt; - page->act++; } - // remember fence key for old page + // finalize left page and save fence key - memcpy(lowerkey, key, key->len + 1); - bt_putid(page->right, new_page); + memcpy(leftkey, key, key->len + 1); page->min = nxt; page->cnt = idx; + // link new right page + + bt_putid (page->right, new_page); + // if current page is the root page, split it if( page_no == ROOT_page ) - return bt_splitroot (bt, lowerkey, oldkey, new_page); - - // obtain Parent/Write locks - // for left and right node pages + return bt_splitroot (bt, leftkey, new_page); - nset = bt_pinlatch (bt, new_page); + // obtain ParentModification lock for current page - bt_lockpage (BtLockParent, nset); bt_lockpage (BtLockParent, set); - // release wr lock on left page - // (keep the SMO in sequence) + // release wr lock on our page. + // this will keep out another SMO bt_unlockpage (BtLockWrite, set); - // insert new fence for reformulated left block + // insert key for old page (lower keys) - if( bt_insertkey (bt, lowerkey+1, *lowerkey, lvl + 1, page_no, tod) ) + if( bt_insertkey (bt, leftkey + 1, *leftkey, page_no, tod, lvl + 1) ) return bt->err; - // fix old fence for newly allocated right block page + // switch old parent key from us to our right page - if( bt_insertkey (bt, oldkey+1, *oldkey, lvl + 1, new_page, tod) ) + if( bt_insertkey (bt, rightkey + 1, *rightkey, new_page, tod, lvl + 1) ) return bt->err; - // release Parent locks + // unlock and unpin - bt_unlockpage (BtLockParent, nset); bt_unlockpage (BtLockParent, set); - bt_unpinlatch (nset); bt_unpinlatch (set); bt_unpinpool (pool); return 0; } -// Insert new key into the btree at requested level. -// Level zero pages are leaf pages. Page is unlocked at exit. +// Insert new key into the btree at given level. -BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod) +BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl) { uint slot, idx; BtPage page; BtKey ptr; - while( 1 ) { - if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) - ptr = keyptr(bt->page, slot); - else - { - if ( !bt->err ) - bt->err = BTERR_ovflw; - return bt->err; - } - - // if key already exists, update id and return - - page = bt->page; - - if( bt->found = !keycmp (ptr, key, len) ) { - slotptr(page, slot)->dead = 0; - slotptr(page, slot)->tod = tod; - bt_putid(slotptr(page,slot)->id, id); - bt_unlockpage(BtLockWrite, bt->set); - bt_unpinlatch(bt->set); - bt_unpinpool (bt->pool); - return bt->err; - } - - // check if page has enough space - - if( slot = bt_cleanpage (bt, len, slot) ) - break; - - if( bt_splitpage (bt) ) - return bt->err; - } - - // calculate next available slot and copy key into page + while( 1 ) { + if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) ) + ptr = keyptr(bt->page, slot); + else + { + if ( !bt->err ) + bt->err = BTERR_ovflw; + return bt->err; + } - page->min -= len + 1; // reset lowest used offset - ((unsigned char *)page)[page->min] = len; - memcpy ((unsigned char *)page + page->min +1, key, len ); + // if key already exists, update id and return - for( idx = slot; idx < page->cnt; idx++ ) - if( slotptr(page, idx)->dead ) - break; + page = bt->page; - // now insert key into array before slot - // preserving the fence slot + if( !keycmp (ptr, key, len) ) { + if( slotptr(page, slot)->dead ) + page->act++; + slotptr(page, slot)->dead = 0; + slotptr(page, slot)->tod = tod; + bt_putid(slotptr(page,slot)->id, id); + bt_unlockpage(BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + return bt->err; + } - if( idx == page->cnt ) - idx++, page->cnt++; + // check if page has enough space - page->act++; + if( slot = bt_cleanpage (bt, bt->page, len, slot) ) + break; - while( idx > slot ) - *slotptr(page, idx) = *slotptr(page, idx -1), idx--; + if( bt_splitpage (bt, bt->page, bt->pool, bt->set, bt->page_no) ) + return bt->err; + } - bt_putid(slotptr(page,slot)->id, id); - slotptr(page, slot)->off = page->min; - slotptr(page, slot)->tod = tod; - slotptr(page, slot)->dead = 0; + bt_addkeytopage (bt, bt->page, slot, key, len, id, tod); - bt_unlockpage (BtLockWrite, bt->set); - bt_unpinlatch (bt->set); - bt_unpinpool (bt->pool); - return 0; + bt_unlockpage (BtLockWrite, bt->set); + bt_unpinlatch (bt->set); + bt_unpinpool (bt->pool); + return 0; } // cache page of keys into cursor and return starting slot for given key @@ -2003,7 +2284,9 @@ uint slot; // cache page for retrieval if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) ) memcpy (bt->cursor, bt->page, bt->mgr->page_size); + bt->cursor_page = bt->page_no; + bt_unlockpage(BtLockRead, bt->set); bt_unpinlatch (bt->set); bt_unpinpool (bt->pool); @@ -2015,6 +2298,7 @@ uint slot; uint bt_nextkey (BtDb *bt, uint slot) { +BtLatchSet *set; BtPool *pool; BtPage page; uid right; @@ -2024,7 +2308,7 @@ uid right; while( slot++ < bt->cursor->cnt ) if( slotptr(bt->cursor,slot)->dead ) continue; - else if( right || (slot < bt->cursor->cnt)) + else if( right || (slot < bt->cursor->cnt) ) return slot; else break; @@ -2033,19 +2317,18 @@ uid right; break; bt->cursor_page = right; - if( pool = bt_pinpool (bt, right) ) page = bt_page (bt, pool, right); else return 0; - bt->set = bt_pinlatch (bt, right); - bt_lockpage(BtLockRead, bt->set); + set = bt_pinlatch (bt, right); + bt_lockpage(BtLockRead, set); memcpy (bt->cursor, page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, bt->set); - bt_unpinlatch (bt->set); + bt_unlockpage(BtLockRead, set); + bt_unpinlatch (set); bt_unpinpool (pool); slot = 0; } while( 1 ); @@ -2068,6 +2351,7 @@ uint bt_tod(BtDb *bt, uint slot) return slotptr(bt->cursor,slot)->tod; } + #ifdef STANDALONE void bt_latchaudit (BtDb *bt) @@ -2139,6 +2423,7 @@ uid next, page_no = LEAF_page; // start on first page of leaves unsigned char key[256]; ThreadArg *args = arg; int ch, len = 0, slot; +BtLatchSet *set; time_t tod[1]; BtPool *pool; BtPage page; @@ -2171,7 +2456,7 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_insertkey (bt, key, len, 0, line, *tod) ) + if( bt_insertkey (bt, key, len, line, *tod, 0) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -2193,7 +2478,7 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; - if( bt_deletekey (bt, key, len, 0) ) + if( bt_deletekey (bt, key, len) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; } @@ -2248,17 +2533,17 @@ FILE *in; fprintf(stderr, "started reading\n"); do { - if( bt->pool = bt_pinpool (bt, page_no) ) - page = bt_page (bt, bt->pool, page_no); + if( pool = bt_pinpool (bt, page_no) ) + page = bt_page (bt, pool, page_no); else break; - bt->set = bt_pinlatch (bt, page_no); - bt_lockpage (BtLockRead, bt->set); + set = bt_pinlatch (bt, page_no); + bt_lockpage (BtLockRead, set); cnt += page->act; next = bt_getid (page->right); - bt_unlockpage (BtLockRead, bt->set); - bt_unpinlatch (bt->set); - bt_unpinpool (bt->pool); + bt_unlockpage (BtLockRead, set); + bt_unpinlatch (set); + bt_unpinpool (pool); } while( page_no = next ); cnt--; // remove stopper key