From 38041a3a1997e07cf9b9ed09ef6c8c1a6e60027e Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 7 Nov 2014 14:04:27 -0800 Subject: [PATCH] A few bug fixes & use native long long for page numbers --- threadskv10g.c | 412 +++++++++++++++++++++++++++++++------------------ 1 file changed, 259 insertions(+), 153 deletions(-) diff --git a/threadskv10g.c b/threadskv10g.c index 6f1d39a..42dfea2 100644 --- a/threadskv10g.c +++ b/threadskv10g.c @@ -222,18 +222,15 @@ typedef struct { typedef struct BtPage_ { uint cnt; // count of keys in page uint act; // count of active keys - uint min; // next key offset + uint min; // next key/value offset + uint fence; // page fence key offset uint garbage; // page garbage in bytes - unsigned char lvl; // level of page - unsigned char free; // page is on free chain + unsigned char lvl; // level of page, zero = leaf + unsigned char free; // page is on the free chain unsigned char kill; // page is being deleted unsigned char nopromote; // page is being constructed - unsigned char filler1[6]; // padding to multiple of 8 bytes - unsigned char right[BtId]; // page number to right - unsigned char left[BtId]; // page number to left - unsigned char filler2[2]; // padding to multiple of 8 bytes + uid right, left; // page numbers to right and left logseqno lsn; // log sequence number applied - uid page_no; // this page number } *BtPage; // The loadpage interface object @@ -247,8 +244,8 @@ typedef struct { typedef struct { struct BtPage_ alloc[1]; // next page_no in right ptr - unsigned char freechain[BtId]; // head of free page_nos chain - unsigned char leafchain[BtId]; // head of leaf page_nos chain + uid freechain; // head of free page_nos chain + uid leafchain; // head of leaf page_nos chain unsigned long long leafpages; // number of active leaf pages unsigned long long upperpages; // number of active upper pages uint redopages; // number of redo pages in file @@ -328,6 +325,7 @@ typedef struct { uint entry:31; // latch table entry number uint reuse:1; // reused previous page uint slot; // slot on page + uint src; // source slot } AtomicTxn; // Catastrophic errors @@ -395,7 +393,7 @@ extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *v extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no); // atomic transaction functions -BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no); +BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no); BTERR bt_promote (BtDb *bt); // The page is allocated from low and hi ends. @@ -447,6 +445,7 @@ BTERR bt_promote (BtDb *bt); #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1)) #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off)) #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len)) +#define fenceptr(page) ((BtKey*)((unsigned char*)(page) + page->fence)) void bt_putid(unsigned char *dest, uid id) { @@ -1344,14 +1343,14 @@ BtVal *val; pagezero->page_bits = mgr->page_bits; pagezero->leaf_xtra = leafxtra; - bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages); + pagezero->alloc->right = mgr->redopage + pagezero->redopages; pagezero->upperpages = 1; pagezero->leafpages = 1; // initialize left-most LEAF page in // alloc->left and count of active leaf pages. - bt_putid (pagezero->alloc->left, LEAF_page); + pagezero->alloc->left = LEAF_page; if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) { fprintf (stderr, "Unable to create btree page zero\n"); @@ -1387,7 +1386,6 @@ BtVal *val; pagezero->alloc->lvl = lvl; pagezero->alloc->cnt = 2; pagezero->alloc->act = 1; - pagezero->alloc->page_no = MIN_lvl - lvl; if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) { fprintf (stderr, "Unable to create btree page\n"); @@ -1546,33 +1544,33 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line) int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no) { uint page_size = mgr->page_size, leaf_xtra = 0; -unsigned char *freechain; +uid *freechain; uid page_no; + // lock allocation page + + bt_mutexlock(mgr->lock); + if( contents->lvl ) { - freechain = mgr->pagezero->freechain; + freechain = &mgr->pagezero->freechain; mgr->pagezero->upperpages++; } else { - freechain = mgr->pagezero->leafchain; + freechain = &mgr->pagezero->leafchain; mgr->pagezero->leafpages++; leaf_xtra = mgr->leaf_xtra; page_size <<= leaf_xtra; } - // lock allocation page - - bt_mutexlock(mgr->lock); - // use empty chain first // else allocate new page - if( page_no = bt_getid(freechain) ) { + if( page_no = *freechain ) { if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) ) set->page = bt_mappage (mgr, set->latch); else return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct; - bt_putid(freechain, bt_getid(set->page->right)); + *freechain = set->page->right; // the page is currently nopromote and this // will keep bt_promote out. @@ -1580,7 +1578,6 @@ uid page_no; // contents will replace this bit // and pin will keep bt_promote out - contents->page_no = page_no; contents->nopromote = 0; memcpy (set->page, contents, page_size); @@ -1592,8 +1589,8 @@ uid page_no; return 0; } - page_no = bt_getid(mgr->pagezero->alloc->right); - bt_putid(mgr->pagezero->alloc->right, page_no+(1 << leaf_xtra)); + page_no = mgr->pagezero->alloc->right; + mgr->pagezero->alloc->right += 1 << leaf_xtra; // unlock allocation latch and // extend file into new page. @@ -1605,7 +1602,6 @@ uid page_no; // keep bt_promote out of this page contents->nopromote = 1; - contents->page_no = page_no; if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size ) fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno); @@ -1630,7 +1626,7 @@ uint good = 0; // make stopper key an infinite fence value - if( bt_getid (page->right) ) + if( page->right ) higher++; else good++; @@ -1661,9 +1657,8 @@ int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint { uid page_no = ROOT_page, prevpage_no = 0; uint drill = 0xff, slot; -BtLatchSet *prevlatch; uint mode, prevmode; -BtPage prevpage; +BtPageSet prev[1]; BtVal *val; BtKey *ptr; @@ -1681,8 +1676,8 @@ BtKey *ptr; // release & unpin parent or left sibling page if( prevpage_no ) { - bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__); - bt_unpinlatch (prevlatch, 0, thread_no, __LINE__); + bt_unlockpage(prevmode, prev->latch, thread_no, __LINE__); + bt_unpinlatch (prev->latch, 0, thread_no, __LINE__); prevpage_no = 0; } @@ -1694,7 +1689,7 @@ BtKey *ptr; // grab our fence key - ptr=keyptr(set->page,set->page->cnt); + ptr=fenceptr(set->page); if( set->page->free ) return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0; @@ -1718,15 +1713,14 @@ BtKey *ptr; } prevpage_no = set->latch->page_no; - prevlatch = set->latch; - prevpage = set->page; prevmode = mode; + *prev = *set; // if requested key is beyond our fence, // slide to the right if( keycmp (ptr, key, len) < 0 ) - if( page_no = bt_getid(set->page->right) ) + if( page_no = set->page->right ) continue; // if page is part of a delete operation, @@ -1734,7 +1728,7 @@ BtKey *ptr; if( set->page->kill ) { bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__); - page_no = bt_getid(set->page->left); + page_no = set->page->left; bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__); continue; } @@ -1767,7 +1761,7 @@ BtKey *ptr; // slide right into next page - page_no = bt_getid(set->page->right); + page_no = set->page->right; } while( page_no ); @@ -1778,29 +1772,29 @@ BtKey *ptr; } // return page to free list -// page must be delete & write locked +// page must be delete, link & write locked // and have no keys pointing to it. void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { -unsigned char *freechain; +uid *freechain; // lock allocation page bt_mutexlock (mgr->lock); if( set->page->lvl ) { - freechain = mgr->pagezero->freechain; + freechain = &mgr->pagezero->freechain; mgr->pagezero->upperpages--; } else { - freechain = mgr->pagezero->leafchain; + freechain = &mgr->pagezero->leafchain; mgr->pagezero->leafpages--; } - // store chain + // store chain link - memcpy(set->page->right, freechain, BtId); - bt_putid(freechain, set->latch->page_no); + set->page->right = *freechain; + *freechain = set->latch->page_no; set->page->free = 1; // if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) @@ -1811,11 +1805,12 @@ unsigned char *freechain; bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__); bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__); + bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__); bt_unpinlatch (set->latch, 1, thread_no, __LINE__); bt_releasemutex (mgr->lock); } -// a fence key was deleted from a page +// a fence key was deleted from an interiour level page // push new fence value upwards BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no) @@ -1827,13 +1822,14 @@ uint idx; // remove the old fence value - ptr = keyptr(set->page, set->page->cnt); + ptr = fenceptr(set->page); memcpy (rightkey, ptr, ptr->len + sizeof(BtKey)); memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot)); + set->page->fence = slotptr(set->page, set->page->cnt)->off; // cache new fence value - ptr = keyptr(set->page, set->page->cnt); + ptr = fenceptr(set->page); memcpy (leftkey, ptr, ptr->len + sizeof(BtKey)); bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__); @@ -1910,7 +1906,7 @@ uint idx; BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl) { unsigned char lowerfence[BT_keyarray]; -uint page_size = mgr->page_size; +uint page_size = mgr->page_size, kill; BtPageSet right[1], temp[1]; unsigned char value[BtId]; uid page_no, right2; @@ -1922,12 +1918,12 @@ BtKey *ptr; // cache original copy of original fence key // that is going to be deleted. - ptr = keyptr(set->page, set->page->cnt); + ptr = fenceptr(set->page); memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey)); // pin and lock our right page - page_no = bt_getid(set->page->right); + page_no = set->page->right; if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) ) right->page = bt_mappage (mgr, right->latch); @@ -1936,34 +1932,35 @@ BtKey *ptr; bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__); - if( right->page->kill ) + if( right->page->kill || set->page->kill ) return mgr->line = __LINE__, mgr->err = BTERR_struct; // pull contents of right sibling over our empty page // preserving our left page number, and its right page number. bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__); - page_no = bt_getid(set->page->left); + page_no = set->page->left; memcpy (set->page, right->page, page_size); - bt_putid (set->page->left, page_no); - set->page->page_no = set->latch->page_no; + set->page->left = page_no; bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__); // fix left link from far right page - if( right2 = bt_getid (right->page->right) ) { + if( right2 = set->page->right ) { if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) ) temp->page = bt_mappage (mgr, temp->latch); else return 0; + bt_lockpage (BtLockAccess, temp->latch, thread_no, __LINE__); bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__); - bt_putid (temp->page->left, set->latch->page_no); + temp->page->left = set->latch->page_no; bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__); + bt_unlockpage(BtLockAccess, temp->latch, thread_no, __LINE__); bt_unpinlatch (temp->latch, 1, thread_no, __LINE__); } else if( !lvl ) { // our page is now rightmost leaf bt_mutexlock (mgr->lock); - bt_putid (mgr->pagezero->alloc->left, set->latch->page_no); + mgr->pagezero->alloc->left = set->latch->page_no; bt_releasemutex(mgr->lock); } @@ -1974,7 +1971,7 @@ BtKey *ptr; // redirect the new higher key directly to our new node - ptr = keyptr(set->page, set->page->cnt); + ptr = fenceptr(set->page); bt_putid (value, set->latch->page_no); if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) ) @@ -1987,17 +1984,18 @@ BtKey *ptr; if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) return mgr->err; - // release write lock to our node - - bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__); - bt_unpinlatch (set->latch, 1, thread_no, __LINE__); - // wait for all access to drain away with delete lock, // then obtain write lock to right node and free it. bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__); bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__); + bt_lockpage (BtLockLink, right->latch, thread_no, __LINE__); bt_freepage (mgr, right, thread_no); + + // release write lock to our node + + bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__); + bt_unpinlatch (set->latch, 1, thread_no, __LINE__); return 0; } @@ -2054,18 +2052,12 @@ BtVal *val; // did we delete a fence key in an upper level? if( lvl && set->page->act && fence ) - if( bt_fixfence (mgr, set, lvl, thread_no) ) - return mgr->err_thread = thread_no, mgr->err; - else - return 0; + return bt_fixfence (mgr, set, lvl, thread_no); // do we need to collapse root? - if( set->latch->page_no == ROOT_page && set->page->act == 1 ) - if( bt_collapseroot (mgr, set, thread_no) ) - return mgr->err_thread = thread_no, mgr->err; - else - return 0; + if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 ) + return bt_collapseroot (mgr, set, thread_no); // delete empty page @@ -2154,6 +2146,7 @@ BtVal *val; page->act++; } + page->fence = page->min; page->cnt = idx; free (frame); @@ -2183,7 +2176,7 @@ BtVal *val; // save left page fence key for new root - ptr = keyptr(root->page, root->page->cnt); + ptr = fenceptr(root->page); memcpy (leftkey, ptr, ptr->len + sizeof(BtKey)); // Obtain an empty page to use, and copy the current @@ -2199,7 +2192,7 @@ BtVal *val; // left link the pages together page = bt_mappage (mgr, right); - bt_putid (page->left, left_page_no); + page->left = left_page_no; // preserve the page info at the bottom // of higher keys and set rest to zero @@ -2216,6 +2209,8 @@ BtVal *val; val->len = BtId; nxt -= 2 + sizeof(BtKey); + page->fence = nxt; + slotptr(root->page, 2)->off = nxt; ptr = (BtKey *)((unsigned char *)root->page + nxt); ptr->len = 2; @@ -2235,7 +2230,7 @@ BtVal *val; slotptr(root->page, 1)->off = nxt; memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey)); - bt_putid(root->page->right, 0); + root->page->right = 0; root->page->min = nxt; // reset lowest used offset and key count root->page->cnt = 2; root->page->act = 2; @@ -2310,17 +2305,18 @@ uint prev; frame->act++; } + frame->fence = frame->min; frame->cnt = idx; frame->lvl = lvl; // link right node if( set->latch->page_no > ROOT_page ) { - right2 = bt_getid (set->page->right); - bt_putid (frame->right, right2); + right2 = set->page->right; + frame->right = right2; if( linkleft ) - bt_putid (frame->left, set->latch->page_no); + frame->left = set->latch->page_no; } // get new free page and write higher keys to it. @@ -2338,12 +2334,12 @@ uint prev; return 0; bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__); - bt_putid (temp->page->left, right->latch->page_no); + temp->page->left = right->latch->page_no; bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__); bt_unpinlatch (temp->latch, 1, thread_no, __LINE__); } else if( !lvl ) { // page is rightmost leaf bt_mutexlock (mgr->lock); - bt_putid (mgr->pagezero->alloc->left, right->latch->page_no); + mgr->pagezero->alloc->left = right->latch->page_no; bt_releasemutex(mgr->lock); } @@ -2385,7 +2381,8 @@ uint prev; set->page->act++; } - bt_putid(set->page->right, right->latch->page_no); + set->page->right = right->latch->page_no; + set->page->fence = set->page->min; set->page->cnt = idx; free(frame); @@ -2412,29 +2409,29 @@ uid right2; if( set->latch->page_no == ROOT_page ) return bt_splitroot (mgr, set, right, thread_no); - ptr = keyptr(set->page, set->page->cnt); + ptr = fenceptr(set->page); memcpy (leftkey, ptr, ptr->len + sizeof(BtKey)); page = bt_mappage (mgr, right); - ptr = keyptr(page, page->cnt); + ptr = fenceptr(page); memcpy (rightkey, ptr, ptr->len + sizeof(BtKey)); // splice in far right page's left page_no - if( right2 = bt_getid (page->right) ) { + if( right2 = page->right ) { if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) ) temp->page = bt_mappage (mgr, temp->latch); else return 0; bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__); - bt_putid (temp->page->left, right->page_no); + temp->page->left = right->page_no; bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__); bt_unpinlatch (temp->latch, 1, thread_no, __LINE__); } else if( !lvl ) { // right page is far right page bt_mutexlock (mgr->lock); - bt_putid (mgr->pagezero->alloc->left, right->page_no); + mgr->pagezero->alloc->left = right->page_no; bt_releasemutex(mgr->lock); } // insert new fences in their parent pages @@ -2678,16 +2675,16 @@ insxit: // determine actual page where key is located // return slot number -uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set) +uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, BtPageSet *set) { -BtKey *key = keyptr(source,src), *ptr; -uint slot = locks[src].slot; +BtKey *key = keyptr(source,locks[idx].src), *ptr; +uint slot = locks[idx].slot; uint entry; - if( locks[src].reuse ) - entry = locks[src-1].entry; + if( locks[idx].reuse ) + entry = locks[idx-1].entry; else - entry = locks[src].entry; + entry = locks[idx].entry; if( slot ) { set->latch = mgr->leafsets + entry; @@ -2706,8 +2703,8 @@ uint entry; if( slot = bt_findslot(set->page, key->key, key->len) ) { if( slotptr(set->page, slot)->type == Librarian ) slot++; - if( locks[src].reuse ) - locks[src].entry = entry; + if( locks[idx].reuse ) + locks[idx].entry = entry; return slot; } } while( entry = set->latch->split ); @@ -2716,17 +2713,17 @@ uint entry; return 0; } -BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn) +BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn) { -BtKey *key = keyptr(source, src); -BtVal *val = valptr(source, src); +BtKey *key = keyptr(source, locks[idx].src); +BtVal *val = valptr(source, locks[idx].src); BtLatchSet *latch; BtPageSet set[1]; uint entry, slot; - while( slot = bt_atomicpage (mgr, source, locks, src, set) ) { + while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) { if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) { - if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) ) + if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,locks[idx].src)->type) ) return mgr->err_thread = thread_no, mgr->err; set->page->lsn = lsn; @@ -2749,7 +2746,7 @@ uint entry, slot; // clear slot number for atomic page - locks[src].slot = 0; + locks[idx].slot = 0; } return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic; @@ -2758,17 +2755,17 @@ uint entry, slot; // perform delete from smaller btree // insert a delete slot if not found there -BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn) +BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn) { -BtKey *key = keyptr(source, src); -uint idx, slot, entry; +BtKey *key = keyptr(source, locks[idx].src); BtLatchSet *latch; +uint slot, entry; BtPageSet set[1]; BtSlot *node; BtKey *ptr; BtVal *val; - while( slot = bt_atomicpage (mgr, source, locks, src, set) ) { + while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) { node = slotptr(set->page, slot); ptr = keyptr(set->page, slot); val = valptr(set->page, slot); @@ -2795,7 +2792,7 @@ BtVal *val; // clear slot number for atomic page - locks[src].slot = 0; + locks[idx].slot = 0; continue; } else @@ -2807,14 +2804,14 @@ BtVal *val; if( node->type == Delete || node->dead ) return 0; - node->type = Delete; - // if main LSM btree, delete the slot + // else change to delete type. if( mgr->type ) { set->page->act--; node->dead = 1; - } + } else + node->type = Delete; __sync_fetch_and_add(&mgr->found, 1); set->page->lsn = lsn; @@ -2883,7 +2880,7 @@ int type; // perform the individual actions in the transaction - if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) ) + if( bt_atomicexec (bt->mgr, source, source->cnt, lsn, 0, bt->thread_no) ) return bt->mgr->err; // if number of active pages @@ -2902,9 +2899,9 @@ int type; // execute the source list of inserts/deletes -BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no) +BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no) { -uint slot, src, idx, samepage, entry; +uint slot, src, idx, samepage, entry, outidx; BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtLatchSet *latch; @@ -2914,24 +2911,29 @@ BtKey *key, *ptr; BtPage page; BtVal *val; - locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); + locks = calloc (count, sizeof(AtomicTxn)); + memset (set, 0, sizeof(BtPageSet)); + outidx = 0; // Load the leaf page for each key // group same page references with reuse bit - for( src = 0; src++ < source->cnt; ) { + for( src = 0; src++ < count; ) { + if( slotptr(source,src)->dead ) + continue; + key = keyptr(source, src); // first determine if this modification falls // on the same page as the previous modification // note that the far right leaf page is a special case - if( samepage = src > 1 ) - samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0; + if( samepage = !!set->page ) + samepage = !set->page->right || keycmp (ptr, key->key, key->len) >= 0; if( !samepage ) if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) ) - ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0; + ptr = fenceptr(set->page), set->latch->split = 0; else return mgr->err; else @@ -2942,22 +2944,23 @@ BtVal *val; slot++; entry = set->latch - mgr->leafsets; - locks[src].reuse = samepage; - locks[src].entry = entry; - locks[src].slot = slot; + locks[outidx].reuse = samepage; + locks[outidx].entry = entry; + locks[outidx].slot = slot; + locks[outidx].src = src; // capture current lsn for master page - locks[src].reqlsn = set->page->lsn; + locks[outidx++].reqlsn = set->page->lsn; } // insert or delete each key // process any splits or merges // run through txn list backwards - samepage = source->cnt + 1; + samepage = outidx; - for( src = source->cnt; src; src-- ) { + for( src = outidx; src--; ) { if( locks[src].reuse ) continue; @@ -2966,7 +2969,7 @@ BtVal *val; // the same page for( idx = src; idx < samepage; idx++ ) - switch( slotptr(source,idx)->type ) { + switch( slotptr(source,locks[idx].src)->type ) { case Delete: if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) ) return mgr->err; @@ -3002,9 +3005,10 @@ BtVal *val; // note that there are no pointers to it yet if( !prev->page->act ) { - memcpy (set->page->left, prev->page->left, BtId); + set->page->left = prev->page->left; memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra); bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__); + bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__); prev->latch->split = set->latch->split; bt_freepage (mgr, set, thread_no); continue; @@ -3015,17 +3019,18 @@ BtVal *val; // thread has its page number yet. if( !set->page->act ) { - memcpy (prev->page->right, set->page->right, BtId); + prev->page->right = set->page->right; prev->latch->split = set->latch->split; bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__); + bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__); bt_freepage (mgr, set, thread_no); continue; } // update prev's fence key - ptr = keyptr(prev->page,prev->page->cnt); + ptr = fenceptr(prev->page); bt_putid (value, prev->latch->page_no); if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) @@ -3033,7 +3038,7 @@ BtVal *val; // splice in the left link into the split page - bt_putid (set->page->left, prev->latch->page_no); + set->page->left = prev->latch->page_no; *prev = *set; } @@ -3044,26 +3049,26 @@ BtVal *val; // fix left pointer in master's original (now split) // far right sibling or set rightmost page in page zero - if( right_page_no = bt_getid (prev->page->right) ) { + if( right_page_no = prev->page->right ) { if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) ) set->page = bt_mappage (mgr, set->latch); else return mgr->err; bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__); - bt_putid (set->page->left, prev->latch->page_no); + set->page->left = prev->latch->page_no; bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__); bt_unpinlatch (set->latch, 1, thread_no, __LINE__); } else { // prev is rightmost page bt_mutexlock (mgr->lock); - bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no); + mgr->pagezero->alloc->left = prev->latch->page_no; bt_releasemutex(mgr->lock); } // switch the original fence key from the // master page to the last split page. - ptr = keyptr(prev->page,prev->page->cnt); + ptr = fenceptr(prev->page); bt_putid (value, prev->latch->page_no); if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) ) @@ -3097,6 +3102,16 @@ BtVal *val; return mgr->err; } + // delete the slots + + for( idx = 0; idx++ < count; ) { + if( slotptr(source,idx)->dead ) + continue; + + slotptr(source,idx)->dead = 1; + source->act--; + } + free (locks); return 0; } @@ -3143,7 +3158,7 @@ BtVal *val; // entry has no right sibling - if( !bt_getid (set->page->right) ) { + if( !set->page->right ) { bt_releasemutex(set->latch->modify); continue; } @@ -3164,10 +3179,11 @@ BtVal *val; bt_releasemutex(set->latch->modify); // transfer slots in our selected page to the main btree + if( !(entry % 100) ) fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act); - if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) { + if( bt_atomicexec (bt->main, set->page, set->page->cnt, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) { fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line); return bt->main->err; } @@ -3209,7 +3225,7 @@ uint slot; // or the key doesn't match what's on the page. if( slot == set->page->cnt ) - if( !bt_getid (set->page->right) ) { + if( !set->page->right ) { bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__); bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__); continue; @@ -3253,8 +3269,8 @@ findxit: BTERR bt_lastkey (BtDb *bt) { -uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left); -uid main_page_no = bt_getid (bt->main->pagezero->alloc->left); +uid cache_page_no = bt->mgr->pagezero->alloc->left; +uid main_page_no = bt->main->pagezero->alloc->left; if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) ) bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch); @@ -3288,7 +3304,7 @@ uid next, us = set->latch->page_no; else return slot; - next = bt_getid(set->page->left); + next = set->page->left; if( !next ) return 0; @@ -3303,7 +3319,7 @@ uid next, us = set->latch->page_no; return 0; bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__); - next = bt_getid (set->page->right); + next = set->page->right; } while( next != us ); @@ -3395,7 +3411,7 @@ uid page_no; while( slot++ < set->page->cnt ) if( slotptr(set->page, slot)->dead ) continue; - else if( slot < set->page->cnt || bt_getid (set->page->right) ) + else if( slot < set->page->cnt || set->page->right ) return slot; else return 0; @@ -3403,7 +3419,7 @@ uid page_no; bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__); bt_unpinlatch (set->latch, 0, thread_no, __LINE__); - if( page_no = bt_getid(set->page->right) ) + if( page_no = set->page->right ) if( set->latch = bt_pinleaf (mgr, page_no, thread_no) ) set->page = bt_mappage (mgr, set->latch); else @@ -3518,6 +3534,48 @@ uint slot; return 0; } +// flush cache pages to main btree + +BTERR bt_flushmain (BtDb *bt) +{ +uint count, cnt = 0; +BtPageSet set[1]; + + while( bt->mgr->pagezero->leafpages > 0 ) { + if( set->latch = bt_pinleaf (bt->mgr, LEAF_page, bt->thread_no) ) + set->page = bt_mappage (bt->mgr, set->latch); + else + return bt->mgr->err; + + bt_lockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__); + count = set->page->cnt; + + if( !set->page->right ) + count--; + +if( !(cnt++ % 100) ) +fprintf(stderr, "Promote LEAF_page %d with %d keys\n", cnt, set->page->act); + + if( bt_atomicexec (bt->main, set->page, count, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) + return bt->mgr->line = bt->main->line, bt->mgr->err = bt->main->err; + + if( set->page->right ) + if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) ) + return bt->mgr->err; + else + continue; + + bt_unlockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__); + bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__); + return 0; + } + + // leaf page count is off + + bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__; + return bt->mgr->err = BTERR_ovflw; +} + #ifdef STANDALONE #ifndef unix @@ -3646,11 +3704,13 @@ unsigned char key[BT_maxkey]; unsigned char buff[65536]; uint nxt = sizeof(buff); ThreadArg *args = arg; +uint counts[8][2]; BtPageSet set[1]; BtPage page; int vallen; BtKey *ptr; BtVal *val; +uint size; BtDb *bt; FILE *in; @@ -3664,6 +3724,15 @@ FILE *in; switch(ch | 0x20) { + case 'm': + fprintf(stderr, "started flushing cache to main btree\n"); + + if( bt->main ) + if( bt_flushmain(bt) ) + fprintf(stderr, "Error %d Line: %d thread: %d\n", bt->mgr->err, bt->mgr->line, bt->thread_no), exit(0); + + break; + case 'd': type = Delete; @@ -3705,6 +3774,7 @@ FILE *in; buff[nxt] = 10; slotptr(page,++cnt)->off = nxt; slotptr(page,cnt)->type = type; + slotptr(page,cnt)->dead = 0; len = 0; if( cnt < args->num ) @@ -3837,15 +3907,24 @@ FILE *in; case 'c': fprintf(stderr, "started counting LSM cache btree\n"); next = bt->mgr->redopage + bt->mgr->pagezero->redopages; + memset (counts, 0, sizeof(counts)); page_no = LEAF_page; + size = bt->mgr->page_size << bt->mgr->leaf_xtra; + page = malloc(size); #ifdef unix posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); #endif - while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { - pread(bt->mgr->idx, page, sizeof(*page), page_no << bt->mgr->page_bits); - if( !page->lvl && !page->free ) + while( page_no < bt->mgr->pagezero->alloc->right ) { + pread(bt->mgr->idx, page, size, page_no << bt->mgr->page_bits); + if( !page->lvl && !page->free ) { cnt += page->act; + + for( idx = 0; idx++ < page->cnt; ) { + BtSlot *node = slotptr (page, idx); + counts[node->type][node->dead]++; + } + } if( next ) page_no = next; else @@ -3854,19 +3933,36 @@ FILE *in; } cachecnt = --cnt; // remove stopper key - cnt = 0; + counts[Unique][0]--; + + fprintf(stderr, " Unique : %d dead: %d\n", counts[Unique][0], counts[Unique][1]); + fprintf(stderr, " Duplicates: %d dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]); + fprintf(stderr, " Librarian : %d dead: %d\n", counts[Librarian][0], counts[Librarian][1]); + fprintf(stderr, " Deletion : %d dead: %d\n", counts[Delete][0], counts[Delete][1]); + fprintf(stderr, "total cache keys count: %d\n", cachecnt); + free (page); fprintf(stderr, "started counting LSM main btree\n"); next = bt->main->redopage + bt->main->pagezero->redopages; + memset (counts, 0, sizeof(counts)); + size = bt->main->page_size << bt->main->leaf_xtra; + page = malloc(size); page_no = LEAF_page; + cnt = 0; #ifdef unix posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL); #endif - while( page_no < bt_getid(bt->main->pagezero->alloc->right) ) { - pread(bt->main->idx, page, sizeof(*page), page_no << bt->main->page_bits); - if( !page->lvl ) + while( page_no < bt->main->pagezero->alloc->right ) { + pread(bt->main->idx, page, size, page_no << bt->main->page_bits); + if( !page->lvl && !page->free ) { cnt += page->act; + + for( idx = 0; idx++ < page->cnt; ) { + BtSlot *node = slotptr (page, idx); + counts[node->type][node->dead]++; + } + } if( next ) page_no = next; else @@ -3875,10 +3971,15 @@ FILE *in; } cnt--; // remove stopper key - - fprintf(stderr, " cache keys counted %d\n", cachecnt); - fprintf(stderr, " main keys counted %d\n", cnt); - fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt); + counts[Unique][0]--; + + fprintf(stderr, " Unique : %d dead: %d\n", counts[Unique][0], counts[Unique][1]); + fprintf(stderr, " Duplicates: %d dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]); + fprintf(stderr, " Librarian : %d dead: %d\n", counts[Librarian][0], counts[Librarian][1]); + fprintf(stderr, " Deletion : %d dead: %d\n", counts[Delete][0], counts[Delete][1]); + fprintf(stderr, "total main keys count : %d\n", cnt); + fprintf(stderr, "Total keys counted : %d\n", cnt + cachecnt); + free (page); break; } @@ -3922,7 +4023,7 @@ BtKey *ptr; fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]); fprintf (stderr, " where idx_file is the name of the cache btree file\n"); fprintf (stderr, " where main_file is the name of the main btree file\n"); - fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n"); + fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort/(c)ount/(m)ainflush, with a one character command for each input src_file. A command can also be given with no input file\n"); fprintf (stderr, " pagebits is the page size in bits for the cache btree\n"); fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n"); fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n"); @@ -4061,6 +4162,11 @@ BtKey *ptr; fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60); } +BtKey *bt_fence (BtPage page) +{ +return fenceptr(page); +} + BtKey *bt_key (BtPage page, uint slot) { return keyptr(page,slot); -- 2.40.0