From c6ac298fb4eb485dc47029a60f234a1ccb6ecca7 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 28 Oct 2014 15:27:19 -0700 Subject: [PATCH] Threadskv10f.c -- add finding keys in addition to penny-sort insert to LSM btree code --- threadskv10e.c => threadskv10f.c | 518 ++++++++++++++++--------------- 1 file changed, 262 insertions(+), 256 deletions(-) rename threadskv10e.c => threadskv10f.c (92%) diff --git a/threadskv10e.c b/threadskv10f.c similarity index 92% rename from threadskv10e.c rename to threadskv10f.c index fa5fdfb..fe6f369 100644 --- a/threadskv10e.c +++ b/threadskv10f.c @@ -1,4 +1,4 @@ -// btree version threadskv10e futex version +// btree version threadskv10f futex version // with reworked bt_deletekey code, // phase-fair re-entrant reader writer lock, // librarian page split code, @@ -8,9 +8,10 @@ // ACID batched key-value updates // redo log for failure recovery // LSM B-trees for write optimization -// and larger sized leaf pages than non-leaf +// larger sized leaf pages than non-leaf +// and LSM B-tree find operations -// 24 OCT 2014 +// 28 OCT 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -127,10 +128,10 @@ typedef struct { typedef struct { MutexLatch xcl[1]; MutexLatch wrt[1]; - uint readers; + ushort readers; ushort dup; // re-entrant locks ushort tid; // owner thread-no - uint line; // owner line # + ushort line; // owner line # } RWLock; // hash table entries @@ -144,11 +145,11 @@ typedef struct { typedef struct { uid page_no; // latch set page number - MutexLatch modify[1]; // modify entry lite latch RWLock readwr[1]; // read/write page lock RWLock access[1]; // Access Intent/Page delete RWLock parent[1]; // Posting of fence key in parent RWLock link[1]; // left link update in progress + MutexLatch modify[1]; // modify entry lite latch uint split; // right split page atomic insert uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain @@ -249,6 +250,7 @@ typedef struct { unsigned char freechain[BtId]; // head of free page_nos chain unsigned char leafchain[BtId]; // head of leaf page_nos chain unsigned long long leafpages; // number of active leaf pages + unsigned long long upperpages; // number of active upper pages uint redopages; // number of redo pages in file unsigned char leaf_xtra; // leaf page size in xtra bits unsigned char page_bits; // base page size in bits @@ -306,7 +308,8 @@ typedef struct { typedef struct { BtMgr *mgr; // buffer manager for entire process BtMgr *main; // buffer manager for main btree - BtPage cursor; // cached page frame for start/next + BtPage cachecursor; // cached page frame for cache btree + BtPage maincursor; // cached page frame for main btree ushort thread_no; // thread number unsigned char key[BT_keyarray]; // last found complete key } BtDb; @@ -317,6 +320,7 @@ typedef struct { logseqno reqlsn; // redo log seq no required uint entry:31; // latch table entry number uint reuse:1; // reused previous page + uint slot; // slot on page } AtomicTxn; // Catastrophic errors @@ -373,8 +377,8 @@ extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char extern uint bt_startkey (BtDb *db, unsigned char *key, uint len); extern uint bt_nextkey (BtDb *bt, uint slot); -extern uint bt_prevkey (BtDb *db, uint slot); -extern uint bt_lastkey (BtDb *db); +extern uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no); +extern uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no); // manager functions extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages); @@ -384,7 +388,7 @@ extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no); // atomic transaction functions BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no); -BTERR bt_txnpromote (BtDb *bt); +BTERR bt_promote (BtDb *bt); // The page is allocated from low and hi ends. // The key slots are allocated from the bottom, @@ -432,7 +436,7 @@ BTERR bt_txnpromote (BtDb *bt); // Access macros to address slot and key values from the page // Page slots use 1 based indexing. -#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1)) +#define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1)) #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off)) #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len)) @@ -494,9 +498,11 @@ int bt_mutextry(MutexLatch *latch) void bt_releasemutex(MutexLatch *latch) { - *latch->bits->xcl = 0; +MutexLatch prev[1]; - if( *latch->bits->waiters ) + *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000); + + if( *prev->bits->waiters ) sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 ); } @@ -1291,11 +1297,15 @@ uint slot; void bt_close (BtDb *bt) { #ifdef unix - if( bt->cursor ) - free (bt->cursor); + if( bt->cachecursor ) + free (bt->cachecursor); + if( bt->maincursor ) + free (bt->maincursor); #else - if( bt->cursor) - VirtualFree (bt->cursor, 0, MEM_RELEASE); + if( bt->cachecursor) + VirtualFree (bt->cachecursor, 0, MEM_RELEASE); + if( bt->maincursor) + VirtualFree (bt->maincursor, 0, MEM_RELEASE); #endif free (bt); } @@ -1416,6 +1426,7 @@ BtVal *val; pagezero->leaf_xtra = leafxtra; bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages); + pagezero->upperpages = 1; pagezero->leafpages = 1; // initialize left-most LEAF page in @@ -1438,6 +1449,11 @@ BtVal *val; node->off <<= mgr->leaf_xtra; node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal)); + node->type = Librarian; + node++->dead = 1; + + node->off = node[-1].off; + key = keyptr(pagezero->alloc, 2); key = keyptr(pagezero->alloc, 1); key->len = 2; // create stopper key key->key[0] = 0xff; @@ -1450,7 +1466,7 @@ BtVal *val; pagezero->alloc->min = node->off; pagezero->alloc->lvl = lvl; - pagezero->alloc->cnt = 1; + pagezero->alloc->cnt = 2; pagezero->alloc->act = 1; pagezero->alloc->page_no = MIN_lvl - lvl; @@ -1520,9 +1536,11 @@ BtDb *bt = malloc (sizeof(*bt)); bt->main = main; bt->mgr = mgr; #ifdef unix - bt->cursor = valloc (mgr->page_size << mgr->leaf_xtra); + bt->cachecursor = valloc (mgr->page_size << mgr->leaf_xtra); + bt->maincursor = valloc (mgr->page_size << mgr->leaf_xtra); #else - bt->cursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE); + bt->cachecursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE); + bt->maincursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE); #endif #ifdef unix bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1; @@ -1563,11 +1581,7 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line) ReadLock (latch->readwr, thread_no); break; case BtLockWrite: -//if(latch->leaf) -//fprintf(stderr, "WrtRqst %d by %d at %d\n", (uint)latch->page_no, thread_no, line); WriteLock (latch->readwr, thread_no, line); -//if(latch->leaf) -//fprintf(stderr, "WrtLock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); break; case BtLockAccess: ReadLock (latch->access, thread_no); @@ -1593,8 +1607,6 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line) ReadRelease (latch->readwr); break; case BtLockWrite: -//if(latch->leaf) -//fprintf(stderr, "Un Lock %d by %d at %d\n", (uint)latch->page_no, thread_no, line); WriteRelease (latch->readwr); break; case BtLockAccess: @@ -1621,9 +1633,10 @@ uint page_size = mgr->page_size, page_xtra = 0; unsigned char *freechain; uid page_no; - if( contents->lvl ) + if( contents->lvl ) { freechain = mgr->pagezero->freechain; - else { + mgr->pagezero->upperpages++; + } else { freechain = mgr->pagezero->leafchain; mgr->pagezero->leafpages++; page_xtra = mgr->leaf_xtra; @@ -1646,10 +1659,10 @@ uid page_no; bt_putid(freechain, bt_getid(set->page->right)); // the page is currently nopromote and this - // will keep bt_txnpromote out. + // will keep bt_promote out. // contents will replace this bit - // and pin will keep bt_txnpromote out + // and pin will keep bt_promote out contents->page_no = page_no; contents->nopromote = 0; @@ -1674,7 +1687,7 @@ uid page_no; // fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); - // keep bt_txnpromote out of this page + // keep bt_promote out of this page contents->nopromote = 1; contents->page_no = page_no; if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size ) @@ -1684,7 +1697,7 @@ uid page_no; else return mgr->err_thread = thread_no, mgr->err; - // now pin will keep bt_txnpromote out + // now pin will keep bt_promote out set->page->nopromote = 0; set->latch->dirty = 1; @@ -1858,9 +1871,10 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { unsigned char *freechain; - if( set->page->lvl ) + if( set->page->lvl ) { freechain = mgr->pagezero->freechain; - else { + mgr->pagezero->upperpages--; + } else { freechain = mgr->pagezero->leafchain; mgr->pagezero->leafpages--; } @@ -2016,7 +2030,7 @@ BtKey *ptr; return mgr->line = __LINE__, mgr->err = BTERR_struct; // pull contents of right peer into our empty page - // preserving our left page number. + // preserving our left page number, and its right page number. bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__); page_no = bt_getid(set->page->left); @@ -2027,6 +2041,8 @@ BtKey *ptr; set->page->page_no = set->latch->page_no; set->latch->dirty = 1; + // fix left link from far right page + if( right2 = bt_getid (right->page->right) ) { if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) ) temp->page = bt_mappage (mgr, temp->latch); @@ -2037,7 +2053,7 @@ BtKey *ptr; bt_putid (temp->page->left, set->latch->page_no); bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__); bt_unpinlatch (temp->latch, thread_no, __LINE__); - } else { // page is rightmost + } else { // our page is now rightmost bt_mutexlock (mgr->lock); bt_putid (mgr->pagezero->alloc->left, set->latch->page_no); bt_releasemutex(mgr->lock); @@ -2047,6 +2063,7 @@ BtKey *ptr; right->latch->dirty = 1; right->page->kill = 1; + bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__); // redirect higher key directly to our new node contents @@ -2056,9 +2073,7 @@ BtKey *ptr; if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) ) return mgr->err; - bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__); - - // delete orignal fence key in parent + // delete our orignal fence key in parent // and unlock our page. ptr = (BtKey *)lowerfence; @@ -2158,97 +2173,6 @@ BtVal *val; return 0; } -// advance to next slot - -uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot) -{ -BtLatchSet *prevlatch; -uid page_no; - - if( slot < set->page->cnt ) - return slot + 1; - - prevlatch = set->latch; - - if( page_no = bt_getid(set->page->right) ) - if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) ) - set->page = bt_mappage (bt->mgr, set->latch); - else - return 0; - else - return bt->mgr->err = BTERR_struct, bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__, 0; - - // obtain access lock using lock chaining with Access mode - - bt_lockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__); - - bt_unlockpage(BtLockRead, prevlatch, bt->thread_no, __LINE__); - bt_unpinlatch (prevlatch, bt->thread_no, __LINE__); - - bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__); - bt_unlockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__); - return 1; -} - -// find unique key == given key, or first duplicate key in -// leaf level and return number of value bytes -// or (-1) if not found. - -int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax) -{ -BtPageSet set[1]; -uint len, slot; -int ret = -1; -BtKey *ptr; -BtVal *val; - - if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) - do { - ptr = keyptr(set->page, slot); - - // skip librarian slot place holder - - if( slotptr(set->page, slot)->type == Librarian ) - ptr = keyptr(set->page, ++slot); - - // return actual key found - - memcpy (bt->key, ptr, ptr->len + sizeof(BtKey)); - len = ptr->len; - - if( slotptr(set->page, slot)->type == Duplicate ) - len -= BtId; - - // not there if we reach the stopper key - - if( slot == set->page->cnt ) - if( !bt_getid (set->page->right) ) - break; - - // if key exists, return >= 0 value bytes copied - // otherwise return (-1) - - if( slotptr(set->page, slot)->dead ) - continue; - - if( keylen == len ) - if( !memcmp (ptr->key, key, len) ) { - val = valptr (set->page,slot); - if( valmax > val->len ) - valmax = val->len; - memcpy (value, val->value, valmax); - ret = valmax; - } - - break; - - } while( slot = bt_findnext (bt, set, slot) ); - - bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__); - bt_unpinlatch (set->latch, bt->thread_no, __LINE__); - return ret; -} - // check page for space available, // clean if necessary and return // 0 - page needs splitting @@ -2345,9 +2269,9 @@ BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread unsigned char leftkey[BT_keyarray]; uint nxt = mgr->page_size; unsigned char value[BtId]; +BtPage frame, page; BtPageSet left[1]; uid left_page_no; -BtPage frame; BtKey *ptr; BtVal *val; @@ -2369,6 +2293,11 @@ BtVal *val; bt_unpinlatch (left->latch, thread_no, __LINE__); free (frame); + // left link the pages together + + page = bt_mappage (mgr, right); + bt_putid (page->left, left_page_no); + // preserve the page info at the bottom // of higher keys and set rest to zero @@ -2431,9 +2360,9 @@ uint page_size = mgr->page_size; BtPageSet right[1], temp[1]; uint cnt = 0, idx = 0, max; uint lvl = set->page->lvl; -BtKey *key, *ptr; -BtVal *val, *src; BtPage frame; +BtKey *key; +BtVal *val; uid right2; uint entry; uint prev; @@ -2455,14 +2384,13 @@ uint prev; if( slotptr(set->page, cnt)->dead ) continue; - src = valptr(set->page, cnt); - frame->min -= src->len + sizeof(BtVal); - memcpy ((unsigned char *)frame + frame->min, src, src->len + sizeof(BtVal)); + val = valptr(set->page, cnt); + frame->min -= val->len + sizeof(BtVal); + memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal)); key = keyptr(set->page, cnt); frame->min -= key->len + sizeof(BtKey); - ptr = (BtKey*)((unsigned char *)frame + frame->min); - memcpy (ptr, key, key->len + sizeof(BtKey)); + memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey)); // add librarian slot @@ -2487,7 +2415,9 @@ uint prev; if( set->latch->page_no > ROOT_page ) { right2 = bt_getid (set->page->right); bt_putid (frame->right, right2); - bt_putid (frame->left, set->latch->page_no); + + if( linkleft ) + bt_putid (frame->left, set->latch->page_no); } // get new free page and write higher keys to it. @@ -2570,8 +2500,10 @@ BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray]; unsigned char value[BtId]; uint lvl = set->page->lvl; +BtPageSet temp[1]; BtPage page; BtKey *ptr; +uid right2; // if current page is the root page, split it @@ -2586,6 +2518,23 @@ BtKey *ptr; ptr = keyptr(page, page->cnt); memcpy (rightkey, ptr, ptr->len + sizeof(BtKey)); + // splice in far right page's left page_no + + if( right2 = bt_getid (page->right) ) { + if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) ) + temp->page = bt_mappage (mgr, temp->latch); + else + return 0; + + bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__); + bt_putid (temp->page->left, right->page_no); + bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__); + bt_unpinlatch (temp->latch, thread_no, __LINE__); + } else { // right page is far right page + bt_mutexlock (mgr->lock); + bt_putid (mgr->pagezero->alloc->left, right->page_no); + bt_releasemutex(mgr->lock); + } // insert new fences in their parent pages bt_lockpage (BtLockParent, right, thread_no, __LINE__); @@ -2649,14 +2598,16 @@ int rate; memcpy (ptr->key, key, keylen); ptr->len = keylen; - // find first empty slot + // find first empty slot at or above our insert slot for( idx = slot; idx < set->page->cnt; idx++ ) if( slotptr(set->page, idx)->dead ) break; - // now insert key into array before slot, - // adding as many librarian slots as + // now insert key into array before slot. + + // if we're going all the way to the top, + // add as many librarian slots as // makes sense. if( idx == set->page->cnt ) { @@ -2680,21 +2631,23 @@ int rate; } else librarian = 0, rate = 0; + // transfer slots and add librarian slots + while( idx > slot ) { - // transfer slot *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1); - idx--; // add librarian slot per rate if( librarian ) - if( (idx - slot + 1)/2 <= librarian * rate ) { - node = slotptr(set->page, idx--); + if( (idx - slot)/2 <= librarian * rate ) { + node = slotptr(set->page, --idx); node->off = node[1].off; node->type = Librarian; node->dead = 1; librarian--; } + + --idx; } set->latch->dirty = 1; @@ -2724,19 +2677,15 @@ BtVal *val; if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) { node = slotptr(set->page, slot); ptr = keyptr(set->page, slot); - } else { - if( !mgr->err ) - mgr->line = __LINE__, mgr->err = BTERR_ovflw; - return mgr->err_thread = thread_no, mgr->err; - } + } else + return mgr->err; // if librarian slot == found slot, advance to real slot if( node->type == Librarian ) { -// if( !keycmp (ptr, key, keylen) ) { - ptr = keyptr(set->page, ++slot); - node = slotptr(set->page, slot); - } + node = slotptr(set->page, ++slot); + ptr = keyptr(set->page, slot); + } // if inserting a duplicate key or unique // key that doesn't exist on the page, @@ -2756,7 +2705,7 @@ BtVal *val; goto insxit; if( entry = bt_splitpage (mgr, set, thread_no, 1) ) - if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) + if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) ) continue; return mgr->err_thread = thread_no, mgr->err; @@ -2800,7 +2749,7 @@ BtVal *val; break; if( entry = bt_splitpage (mgr, set, thread_no, 1) ) - if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) + if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) ) continue; return mgr->err_thread = thread_no, mgr->err; @@ -2833,14 +2782,20 @@ insxit: uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set) { BtKey *key = keyptr(source,src), *ptr; -unsigned char fence[BT_keyarray]; -uint entry, slot; +uint slot = locks[src].slot; +uint entry; if( locks[src].reuse ) entry = locks[src-1].entry; else entry = locks[src].entry; + if( slot ) { + set->latch = mgr->leafsets + entry; + set->page = bt_mappage (mgr, set->latch); + return slot; + } + // find where our key is located // on current page or pages split on // same page txn operations. @@ -2858,9 +2813,6 @@ uint entry, slot; } } while( entry = set->latch->split ); - ptr = keyptr (set->page, set->page->cnt); - memcpy (fence, ptr, ptr->len + 1); - mgr->line = __LINE__, mgr->err = BTERR_atomic; return 0; } @@ -2887,7 +2839,7 @@ uint entry, slot; if( entry = bt_splitpage (mgr, set, thread_no, 0) ) latch = mgr->leafsets + entry; else - return mgr->err_thread = thread_no, mgr->err; + return mgr->err; // splice right page into split chain // and WriteLock it @@ -2895,6 +2847,10 @@ uint entry, slot; bt_lockpage(BtLockWrite, latch, thread_no, __LINE__); latch->split = set->latch->split; set->latch->split = entry; + + // clear slot number for atomic page + + locks[src].slot = 0; } return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic; @@ -3009,7 +2965,7 @@ int type; if( bt->main ) while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no ) - if( bt_txnpromote (bt) ) + if( bt_promote (bt) ) return bt->mgr->err; // return success @@ -3021,7 +2977,7 @@ int type; BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no) { -uint src, idx, samepage, entry; +uint slot, src, idx, samepage, entry; BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtLatchSet *latch; @@ -3047,14 +3003,21 @@ BtVal *val; samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0; if( !samepage ) - if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) ) + if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) ) ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0; else return mgr->err; + else + slot = 0; + + if( slot ) + if( slotptr(set->page, slot)->type == Librarian ) + slot++; entry = set->latch - mgr->leafsets; locks[src].reuse = samepage; locks[src].entry = entry; + locks[src].slot = slot; // capture current lsn for master page @@ -3227,7 +3190,7 @@ BtVal *val; // pick & promote a page into the larger btree -BTERR bt_txnpromote (BtDb *bt) +BTERR bt_promote (BtDb *bt) { uint entry, slot, idx; BtPageSet set[1]; @@ -3305,58 +3268,126 @@ fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_n } } +// find unique key == given key, or first duplicate key in +// leaf level and return number of value bytes +// or (-1) if not found. + +int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax) +{ +int ret = -1, type; +BtPageSet set[1]; +BtSlot *node; +BtKey *ptr; +BtVal *val; +uint slot; + + for( type = 0; type < 2; type++ ) + if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) { + node = slotptr(set->page, slot); + + // skip librarian slot place holder + + if( node->type == Librarian ) + node = slotptr(set->page, ++slot); + + ptr = keyptr(set->page, slot); + + // not there if we reach the stopper key + // or the key doesn't match what's on the page. + + if( slot == set->page->cnt ) + if( !bt_getid (set->page->right) ) { + bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__); + bt_unpinlatch (set->latch, bt->thread_no, __LINE__); + continue; + } + + if( keycmp (ptr, key, keylen) ) { + bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__); + bt_unpinlatch (set->latch, bt->thread_no, __LINE__); + continue; + } + + // key matches, return >= 0 value bytes copied + // or -1 if not there. + + if( node->type == Delete || node->dead ) { + ret = -1; + goto findxit; + } + + val = valptr (set->page,slot); + + if( valmax > val->len ) + valmax = val->len; + + memcpy (value, val->value, valmax); + ret = valmax; + goto findxit; + } + + ret = -1; + +findxit: + if( type < 2 ) { + bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__); + bt_unpinlatch (set->latch, bt->thread_no, __LINE__); + } + return ret; +} + // set cursor to highest slot on highest page -uint bt_lastkey (BtDb *bt) +uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no) { -uid page_no = bt_getid (bt->mgr->pagezero->alloc->left); +uid page_no = bt_getid (mgr->pagezero->alloc->left); BtPageSet set[1]; - if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) ) - set->page = bt_mappage (bt->mgr, set->latch); + if( set->latch = bt_pinleaf (mgr, page_no, thread_no) ) + set->page = bt_mappage (mgr, set->latch); else return 0; - bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__); - memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra); - bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__); - bt_unpinlatch (set->latch, bt->thread_no, __LINE__); - return bt->cursor->cnt; + bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__); + memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra); + bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__); + bt_unpinlatch (set->latch, thread_no, __LINE__); + return cursor->cnt; } // return previous slot on cursor page -uint bt_prevkey (BtDb *bt, uint slot) +uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no) { -uid cursor_page = bt->cursor->page_no; +uid cursor_page = cursor->page_no; uid ourright, next, us = cursor_page; BtPageSet set[1]; if( --slot ) return slot; - ourright = bt_getid(bt->cursor->right); + ourright = bt_getid(cursor->right); goleft: - if( !(next = bt_getid(bt->cursor->left)) ) + if( !(next = bt_getid(cursor->left)) ) return 0; findourself: cursor_page = next; - if( set->latch = bt_pinleaf (bt->mgr, next, bt->thread_no) ) - set->page = bt_mappage (bt->mgr, set->latch); + if( set->latch = bt_pinleaf (mgr, next, thread_no) ) + set->page = bt_mappage (mgr, set->latch); else return 0; - bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__); - memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra); - bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__); - bt_unpinlatch (set->latch, bt->thread_no, __LINE__); + bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__); + memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra); + bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__); + bt_unpinlatch (set->latch, thread_no, __LINE__); - next = bt_getid (bt->cursor->right); + next = bt_getid (cursor->right); - if( bt->cursor->kill ) + if( cursor->kill ) goto findourself; if( next != us ) @@ -3365,7 +3396,7 @@ findourself: else goto findourself; - return bt->cursor->cnt; + return cursor->cnt; } // return next slot on cursor page @@ -3377,12 +3408,12 @@ BtPageSet set[1]; uid right; do { - right = bt_getid(bt->cursor->right); + right = bt_getid(bt->cachecursor->right); - while( slot++ < bt->cursor->cnt ) - if( slotptr(bt->cursor,slot)->dead ) + while( slot++ < bt->cachecursor->cnt ) + if( slotptr(bt->cachecursor,slot)->dead ) continue; - else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper + else if( right || (slot < bt->cachecursor->cnt) ) // skip infinite stopper return slot; else break; @@ -3396,7 +3427,7 @@ uid right; return 0; bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__); - memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra); + memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra); bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__); bt_unpinlatch (set->latch, bt->thread_no, __LINE__); slot = 0; @@ -3416,7 +3447,7 @@ uint slot; // cache page for retrieval if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) ) - memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra); + memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra); else return 0; @@ -3661,55 +3692,43 @@ FILE *in; break; case 's': - fprintf(stderr, "started scanning\n"); - - do { - if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) ) - set->page = bt_mappage (bt->mgr, set->latch); - else - fprintf(stderr, "unable to obtain latch"), exit(1); - - bt_lockpage (BtLockRead, set->latch, bt->thread_no, __LINE__); - next = bt_getid (set->page->right); - - for( slot = 0; slot++ < set->page->cnt; ) - if( next || slot < set->page->cnt ) - if( !slotptr(set->page, slot)->dead ) { - ptr = keyptr(set->page, slot); - len = ptr->len; - - if( slotptr(set->page, slot)->type == Duplicate ) - len -= BtId; - - fwrite (ptr->key, len, 1, stdout); - val = valptr(set->page, slot); - fwrite (val->value, val->len, 1, stdout); - fputc ('\n', stdout); - cnt++; - } - - bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__); - bt_unpinlatch (set->latch, bt->thread_no, __LINE__); - } while( page_no = next ); + fprintf(stderr, "started forward scan\n"); + if( slot = bt_startkey (bt, NULL, 0) ) + do { + if( slotptr(bt->cachecursor, slot)->type == Librarian ) + slot++; + + ptr = keyptr(bt->cachecursor, slot); + len = ptr->len; + + if( slotptr(bt->cachecursor, slot)->type == Duplicate ) + len -= BtId; + + fwrite (ptr->key, len, 1, stdout); + val = valptr(bt->cachecursor, slot); + fwrite (val->value, val->len, 1, stdout); + fputc ('\n', stdout); + cnt++; + } while( slot = bt_nextkey (bt, slot) ); fprintf(stderr, " Total keys read %d\n", cnt); break; case 'r': fprintf(stderr, "started reverse scan\n"); - if( slot = bt_lastkey (bt) ) - while( slot = bt_prevkey (bt, slot) ) { - if( slotptr(bt->cursor, slot)->dead ) + if( slot = bt_lastkey (bt->mgr, bt->cachecursor, bt->thread_no) ) + while( slot = bt_prevkey (bt->mgr, bt->cachecursor, slot, bt->thread_no) ) { + if( slotptr(bt->cachecursor, slot)->dead ) continue; - ptr = keyptr(bt->cursor, slot); + ptr = keyptr(bt->cachecursor, slot); len = ptr->len; - if( slotptr(bt->cursor, slot)->type == Duplicate ) + if( slotptr(bt->cachecursor, slot)->type == Duplicate ) len -= BtId; fwrite (ptr->key, len, 1, stdout); - val = valptr(bt->cursor, slot); + val = valptr(bt->cachecursor, slot); fwrite (val->value, val->len, 1, stdout); fputc ('\n', stdout); cnt++; @@ -3717,28 +3736,6 @@ FILE *in; fprintf(stderr, " Total keys read %d\n", cnt); break; - - case 'c': -#ifdef unix - posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); -#endif - fprintf(stderr, "started counting\n"); - next = bt->mgr->redopage + bt->mgr->pagezero->redopages; - - while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { - if( bt_readpage (bt->mgr, bt->cursor, page_no, 1) ) - break; - - if( !bt->cursor->free && !bt->cursor->lvl ) - cnt += bt->cursor->act; - - bt->mgr->reads++; - page_no = next++; - } - - cnt--; // remove stopper key - fprintf(stderr, " Total keys counted %d\n", cnt); - break; } bt_close (bt); @@ -3781,7 +3778,7 @@ BtKey *ptr; fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]); fprintf (stderr, " where idx_file is the name of the cache btree file\n"); fprintf (stderr, " where main_file is the name of the main btree file\n"); - fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n"); + fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n"); fprintf (stderr, " pagebits is the page size in bits for the cache btree\n"); fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n"); fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n"); @@ -3901,9 +3898,9 @@ BtKey *ptr; if( main ) bt_poolaudit(main, "main"); - fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found); + fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found); if( main ) - fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found); + fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found); if( main ) bt_mgrclose (main); @@ -3917,4 +3914,13 @@ BtKey *ptr; fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60); } +BtKey *bt_key (BtPage page, uint slot) +{ +return keyptr(page,slot); +} + +BtSlot *bt_slot (BtPage page, uint slot) +{ +return slotptr(page,slot); +} #endif //STANDALONE -- 2.40.0