typedef struct BtPage_ {
uint cnt; // count of keys in page
uint act; // count of active keys
- uint min; // next key offset
+ uint min; // next key/value offset
+ uint fence; // page fence key offset
uint garbage; // page garbage in bytes
- unsigned char lvl; // level of page
- unsigned char free; // page is on free chain
+ unsigned char lvl; // level of page, zero = leaf
+ unsigned char free; // page is on the free chain
unsigned char kill; // page is being deleted
unsigned char nopromote; // page is being constructed
- unsigned char filler1[6]; // padding to multiple of 8 bytes
- unsigned char right[BtId]; // page number to right
- unsigned char left[BtId]; // page number to left
- unsigned char filler2[2]; // padding to multiple of 8 bytes
+ uid right, left; // page numbers to right and left
logseqno lsn; // log sequence number applied
- uid page_no; // this page number
} *BtPage;
// The loadpage interface object
typedef struct {
struct BtPage_ alloc[1]; // next page_no in right ptr
- unsigned char freechain[BtId]; // head of free page_nos chain
- unsigned char leafchain[BtId]; // head of leaf page_nos chain
+ uid freechain; // head of free page_nos chain
+ uid leafchain; // head of leaf page_nos chain
unsigned long long leafpages; // number of active leaf pages
unsigned long long upperpages; // number of active upper pages
uint redopages; // number of redo pages in file
logseqno lsn, flushlsn; // current & first lsn flushed
MutexLatch redo[1]; // redo area lite latch
MutexLatch lock[1]; // allocation area lite latch
- MutexLatch maps[1]; // mapping segments lite latch
ushort thread_no[1]; // highest thread number issued
ushort err_thread; // error thread number
uint nlatchpage; // size of buffer pool & latchsets
uint latchtotal; // number of page latch entries
uint latchhash; // number of latch hash table slots
- uint latchvictim; // next latch entry to examine
+ uint latchvictim; // next latch entry to swap out
uint nleafpage; // size of leaf pool & leafsets
uint leaftotal; // number of leaf latch entries
uint leafhash; // number of leaf hash table slots
- uint leafvictim; // next leaf entry to examine
+ uint leafvictim; // next leaf entry to swap out
uint leafpromote; // next leaf entry to promote
uint redopage; // page number of redo buffer
uint redolast; // last msync size of recovery buff
int line; // last error line no
int found; // number of keys found by delete
int reads, writes; // number of reads and writes
+ int type; // type of LSM tree 0=cache, 1=main
#ifndef unix
HANDLE halloc; // allocation handle
HANDLE hpool; // buffer pool handle
#endif
- volatile uint segments; // number of memory mapped segments
- unsigned char *pages[64000];// memory mapped segments of b-tree
+ unsigned char *page0; // memory mapped pagezero of b-tree
} BtMgr;
typedef struct {
uint entry:31; // latch table entry number
uint reuse:1; // reused previous page
uint slot; // slot on page
+ uint src; // source slot
} AtomicTxn;
// Catastrophic errors
extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
// atomic transaction functions
-BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no);
BTERR bt_promote (BtDb *bt);
// The page is allocated from low and hi ends.
#define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
#define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
#define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
+#define fenceptr(page) ((BtKey*)((unsigned char*)(page) + page->fence))
void bt_putid(unsigned char *dest, uid id)
{
bt_releasemutex (latch->modify);
}
fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
-fprintf(stderr, "begin sync");
- for( segment = 0; segment < mgr->segments; segment++ )
- if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
- fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
-fprintf(stderr, " end sync\n");
}
// recovery manager -- process current recovery buff on startup
return lsn;
}
-// sync a single btree page to disk
-
-BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
-{
-uint segment = latch->page_no >> 16;
-uint page_size = mgr->page_size;
-BtPage perm;
-
- if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
- return mgr->err;
-
- if( !page->lvl )
- page_size <<= mgr->leaf_xtra;
-
- perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
-
- if( msync (perm, page_size, MS_SYNC) < 0 )
- fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
-
- latch->dirty = 0;
- return 0;
-}
-
// read page into buffer pool from permanent location in Btree file
BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
{
-int flag = PROT_READ | PROT_WRITE;
uint page_size = mgr->page_size;
-uint off = 0, segment, fragment;
-unsigned char *perm;
if( leaf )
page_size <<= mgr->leaf_xtra;
- fragment = page_no & 0xffff;
- segment = page_no >> 16;
- mgr->reads++;
-
- while( off < page_size ) {
- if( fragment >> 16 )
- segment++, fragment = 0;
-
- if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
-
- memcpy ((unsigned char *)page + off, perm, mgr->page_size);
- off += mgr->page_size;
- fragment++;
- continue;
- }
-
- bt_mutexlock (mgr->maps);
-
- if( segment < mgr->segments ) {
- bt_releasemutex (mgr->maps);
- continue;
- }
-
- mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
- mgr->segments++;
-
- bt_releasemutex (mgr->maps);
- }
+ if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
+ return mgr->err = BTERR_read;
+ __sync_fetch_and_add (&mgr->reads, 1);
return 0;
}
BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
{
-int flag = PROT_READ | PROT_WRITE;
uint page_size = mgr->page_size;
-uint off = 0, segment, fragment;
-unsigned char *perm;
if( leaf )
page_size <<= mgr->leaf_xtra;
- fragment = page_no & 0xffff;
- segment = page_no >> 16;
- mgr->writes++;
-
- while( off < page_size ) {
- if( fragment >> 16 )
- segment++, fragment = 0;
-
- if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
- memcpy (perm, (unsigned char *)page + off, mgr->page_size);
- off += mgr->page_size;
- fragment++;
- continue;
- }
-
- bt_mutexlock (mgr->maps);
-
- if( segment < mgr->segments ) {
- bt_releasemutex (mgr->maps);
- continue;
- }
-
- mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
- mgr->segments++;
- bt_releasemutex (mgr->maps);
- }
+ if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
+ return mgr->err = BTERR_wrt;
+ __sync_fetch_and_add (&mgr->writes, 1);
return 0;
}
// set CLOCK bit in latch
// decrement pin count
-int value;
-
-void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
+void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
{
bt_mutexlock(latch->modify);
+ if( dirty )
+ latch->dirty = 1;
latch->pin |= CLOCK_bit;
latch->pin--;
-
bt_releasemutex(latch->modify);
}
BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
{
uint hashidx = page_no % mgr->leafhash;
+uint entry, oldidx;
BtLatchSet *latch;
-uint entry, idx;
BtPage page;
// try to find our entry
entry = bt_availleaf (mgr);
latch = mgr->leafsets + entry;
- idx = latch->page_no % mgr->leafhash;
+ page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
+ oldidx = latch->page_no % mgr->leafhash;
- // if latch is on a different hash chain
- // unlink from the old page_no chain
+ // skip over this entry if old latch is not available
if( latch->page_no )
- if( idx != hashidx ) {
-
- // skip over this entry if latch not available
-
- if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
+ if( oldidx != hashidx )
+ if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
bt_releasemutex(latch->modify);
goto trynext;
}
- if( latch->prev )
- mgr->leafsets[latch->prev].next = latch->next;
- else
- mgr->leaftable[idx].entry = latch->next;
-
- if( latch->next )
- mgr->leafsets[latch->next].prev = latch->prev;
-
- bt_releasemutex (mgr->leaftable[idx].latch);
- }
-
- page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
-
// update permanent page area in btree from buffer pool
// no read-lock is required since page is not pinned.
else
latch->dirty = 0;
+ // if latch is on a different hash chain
+ // unlink from the old page_no chain
+
+ if( latch->page_no )
+ if( oldidx != hashidx ) {
+ if( latch->prev )
+ mgr->leafsets[latch->prev].next = latch->next;
+ else
+ mgr->leaftable[oldidx].entry = latch->next;
+
+ if( latch->next )
+ mgr->leafsets[latch->next].prev = latch->prev;
+
+ bt_releasemutex (mgr->leaftable[oldidx].latch);
+ }
+
if( bt_readpage (mgr, page, page_no, 1) )
- return mgr->line = __LINE__, NULL;
+ return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
// link page as head of hash table chain
// if this is a never before used entry,
// leave it in its current hash table
// chain position.
- if( !latch->page_no || hashidx != idx ) {
+ if( !latch->page_no || hashidx != oldidx ) {
if( latch->next = mgr->leaftable[hashidx].entry )
mgr->leafsets[latch->next].prev = entry;
BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
{
uint hashidx = page_no % mgr->latchhash;
+uint entry, oldidx;
BtLatchSet *latch;
-uint entry, idx;
BtPage page;
// try to find our entry
entry = bt_availnext (mgr);
latch = mgr->latchsets + entry;
- idx = latch->page_no % mgr->latchhash;
+ page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
+ oldidx = latch->page_no % mgr->latchhash;
- // if latch is on a different hash chain
- // unlink from the old page_no chain
+ // skip over this entry if latch not available
if( latch->page_no )
- if( idx != hashidx ) {
-
- // skip over this entry if latch not available
-
- if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
+ if( oldidx != hashidx )
+ if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
bt_releasemutex(latch->modify);
goto trynext;
}
- if( latch->prev )
- mgr->latchsets[latch->prev].next = latch->next;
- else
- mgr->hashtable[idx].entry = latch->next;
-
- if( latch->next )
- mgr->latchsets[latch->next].prev = latch->prev;
-
- bt_releasemutex (mgr->hashtable[idx].latch);
- }
-
- page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
-
// update permanent page area in btree from buffer pool
// no read-lock is required since page is not pinned.
else
latch->dirty = 0;
+ // if latch is on a different hash chain
+ // unlink from the old page_no chain
+
+ if( latch->page_no )
+ if( oldidx != hashidx ) {
+ if( latch->prev )
+ mgr->latchsets[latch->prev].next = latch->next;
+ else
+ mgr->hashtable[oldidx].entry = latch->next;
+
+ if( latch->next )
+ mgr->latchsets[latch->next].prev = latch->prev;
+
+ bt_releasemutex (mgr->hashtable[oldidx].latch);
+ }
+
if( bt_readpage (mgr, page, page_no, 0) )
return mgr->line = __LINE__, NULL;
// leave it in its current hash table
// chain position.
- if( !latch->page_no || hashidx != idx ) {
+ if( !latch->page_no || hashidx != oldidx ) {
if( latch->next = mgr->hashtable[hashidx].entry )
mgr->latchsets[latch->next].prev = entry;
void bt_mgrclose (BtMgr *mgr)
{
+char *name = mgr->type ? "Main" : "Cache";
BtLatchSet *latch;
BtLogHdr *eof;
uint num = 0;
BtPage page;
-uint slot;
+uint entry;
// flush previously written dirty pages
// and write recovery buffer to disk
// write remaining dirty pool pages to the btree
- for( slot = 1; slot < mgr->latchtotal; slot++ ) {
- page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
- latch = mgr->latchsets + slot;
+ for( entry = 1; entry < mgr->latchtotal; entry++ ) {
+ page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
+ latch = mgr->latchsets + entry;
if( latch->dirty ) {
bt_writepage(mgr, page, latch->page_no, 0);
}
}
+ if( num )
+ fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
+
+ num = 0;
+
// write remaining dirty leaf pages to the btree
- for( slot = 1; slot < mgr->leaftotal; slot++ ) {
- page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
- latch = mgr->leafsets + slot;
+ for( entry = 1; entry < mgr->leaftotal; entry++ ) {
+ page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
+ latch = mgr->leafsets + entry;
if( latch->dirty ) {
bt_writepage(mgr, page, latch->page_no, 1);
}
}
+ if( num )
+ fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
+
// clear redo recovery buffer on disk.
if( mgr->pagezero->redopages ) {
fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
}
- fprintf(stderr, "%d buffer pool pages flushed\n", num);
-
#ifdef unix
- while( mgr->segments )
- munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
+ munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
close (mgr->idx);
free (mgr);
#else
- VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
FlushFileBuffers(mgr->idx);
CloseHandle(mgr->idx);
GlobalFree (mgr);
// open/create new btree buffer manager
// call with file_name, BT_openmode, bits in page size (e.g. 16),
-// size of page pool (e.g. 262144) and leaf pool
+// size of page pool (e.g. 300) and leaf pool (e.g. 4000)
BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
{
if( pagezero->page_bits ) {
pagebits = pagezero->page_bits;
leafxtra = pagezero->leaf_xtra;
+ redopages = pagezero->redopages;
} else
initit = 1;
else
return bt_mgrclose (mgr), NULL;
pagebits = pagezero->page_bits;
leafxtra = pagezero->leaf_xtra;
+ redopages = pagezero->redopages;
} else
initit = 1;
#endif
pagezero->page_bits = mgr->page_bits;
pagezero->leaf_xtra = leafxtra;
- bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
+ pagezero->alloc->right = mgr->redopage + pagezero->redopages;
pagezero->upperpages = 1;
pagezero->leafpages = 1;
// initialize left-most LEAF page in
// alloc->left and count of active leaf pages.
- bt_putid (pagezero->alloc->left, LEAF_page);
+ pagezero->alloc->left = LEAF_page;
if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
fprintf (stderr, "Unable to create btree page zero\n");
pagezero->alloc->lvl = lvl;
pagezero->alloc->cnt = 2;
pagezero->alloc->act = 1;
- pagezero->alloc->page_no = MIN_lvl - lvl;
if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
fprintf (stderr, "Unable to create btree page\n");
// mlock the first segment of 64K pages
flag = PROT_READ | PROT_WRITE;
- mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
- mgr->segments = 1;
+ mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
- if( mgr->pages[0] == MAP_FAILED ) {
- fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
+ if( mgr->page0 == MAP_FAILED ) {
+ fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
- mgr->pagezero = (BtPageZero *)mgr->pages[0];
+ mgr->pagezero = (BtPageZero *)mgr->page0;
mlock (mgr->pagezero, mgr->page_size);
- mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
- mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
+ mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
+ mlock (mgr->redobuff, redopages << mgr->page_bits);
// allocate pool buffers
mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
+ for( idx = 1; idx < mgr->leaftotal; idx++ ) {
+ latch = mgr->leafsets + idx;
+ latch->leaf = 1;
+ }
+
return mgr;
}
int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
{
-uint page_size = mgr->page_size, page_xtra = 0;
-unsigned char *freechain;
+uint page_size = mgr->page_size, leaf_xtra = 0;
+uid *freechain;
uid page_no;
+ // lock allocation page
+
+ bt_mutexlock(mgr->lock);
+
if( contents->lvl ) {
- freechain = mgr->pagezero->freechain;
+ freechain = &mgr->pagezero->freechain;
mgr->pagezero->upperpages++;
} else {
- freechain = mgr->pagezero->leafchain;
+ freechain = &mgr->pagezero->leafchain;
mgr->pagezero->leafpages++;
- page_xtra = mgr->leaf_xtra;
- page_size <<= page_xtra;
+ leaf_xtra = mgr->leaf_xtra;
+ page_size <<= leaf_xtra;
}
- // lock allocation page
-
- bt_mutexlock(mgr->lock);
-
// use empty chain first
// else allocate new page
- if( page_no = bt_getid(freechain) ) {
+ if( page_no = *freechain ) {
if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
set->page = bt_mappage (mgr, set->latch);
else
return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
- bt_putid(freechain, bt_getid(set->page->right));
+ *freechain = set->page->right;
// the page is currently nopromote and this
// will keep bt_promote out.
// contents will replace this bit
// and pin will keep bt_promote out
- contents->page_no = page_no;
contents->nopromote = 0;
- set->latch->dirty = 1;
memcpy (set->page, contents, page_size);
return 0;
}
- page_no = bt_getid(mgr->pagezero->alloc->right);
- bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
+ page_no = mgr->pagezero->alloc->right;
+ mgr->pagezero->alloc->right += 1 << leaf_xtra;
// unlock allocation latch and
// extend file into new page.
// if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
// fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
bt_releasemutex(mgr->lock);
// keep bt_promote out of this page
contents->nopromote = 1;
- contents->page_no = page_no;
+
if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
+
if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
set->page = bt_mappage (mgr, set->latch);
else
// now pin will keep bt_promote out
set->page->nopromote = 0;
- set->latch->dirty = 1;
return 0;
}
// make stopper key an infinite fence value
- if( bt_getid (page->right) )
+ if( page->right )
higher++;
else
good++;
{
uid page_no = ROOT_page, prevpage_no = 0;
uint drill = 0xff, slot;
-BtLatchSet *prevlatch;
uint mode, prevmode;
-BtPage prevpage;
+BtPageSet prev[1];
BtVal *val;
BtKey *ptr;
// start at root of btree and drill down
do {
- // determine lock mode of drill level
- mode = (drill == lvl) ? lock : BtLockRead;
-
if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
set->page = bt_mappage (mgr, set->latch);
else
return 0;
- // obtain access lock using lock chaining with Access mode
-
if( page_no > ROOT_page )
bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
// release & unpin parent or left sibling page
if( prevpage_no ) {
- bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
- bt_unpinlatch (prevlatch, thread_no, __LINE__);
+ bt_unlockpage(prevmode, prev->latch, thread_no, __LINE__);
+ bt_unpinlatch (prev->latch, 0, thread_no, __LINE__);
prevpage_no = 0;
}
// obtain mode lock using lock coupling through AccessLock
+ // determine lock mode of drill level
+ mode = (drill == lvl) ? lock : BtLockRead;
bt_lockpage(mode, set->latch, thread_no, __LINE__);
// grab our fence key
- ptr=keyptr(set->page,set->page->cnt);
+ ptr=fenceptr(set->page);
if( set->page->free )
return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
if( lock != BtLockRead && drill == lvl ) {
bt_unlockpage(mode, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
continue;
}
}
prevpage_no = set->latch->page_no;
- prevlatch = set->latch;
- prevpage = set->page;
prevmode = mode;
+ *prev = *set;
// if requested key is beyond our fence,
// slide to the right
if( keycmp (ptr, key, len) < 0 )
- if( page_no = bt_getid(set->page->right) )
+ if( page_no = set->page->right )
continue;
// if page is part of a delete operation,
if( set->page->kill ) {
bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
- page_no = bt_getid(set->page->left);
+ page_no = set->page->left;
bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
continue;
}
// slide right into next page
- page_no = bt_getid(set->page->right);
+ page_no = set->page->right;
} while( page_no );
}
// return page to free list
-// page must be delete & write locked
+// page must be delete, link & write locked
// and have no keys pointing to it.
void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
{
-unsigned char *freechain;
+uid *freechain;
+
+ // lock allocation page
+
+ bt_mutexlock (mgr->lock);
if( set->page->lvl ) {
- freechain = mgr->pagezero->freechain;
+ freechain = &mgr->pagezero->freechain;
mgr->pagezero->upperpages--;
} else {
- freechain = mgr->pagezero->leafchain;
+ freechain = &mgr->pagezero->leafchain;
mgr->pagezero->leafpages--;
}
- // lock allocation page
-
- bt_mutexlock (mgr->lock);
+ // store chain link
- // store chain
-
- memcpy(set->page->right, freechain, BtId);
- bt_putid(freechain, set->latch->page_no);
- set->latch->dirty = 1;
+ set->page->right = *freechain;
+ *freechain = set->latch->page_no;
set->page->free = 1;
// if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
bt_releasemutex (mgr->lock);
}
-// a fence key was deleted from a page
+// a fence key was deleted from an interiour level page
// push new fence value upwards
BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
// remove the old fence value
- ptr = keyptr(set->page, set->page->cnt);
+ ptr = fenceptr(set->page);
memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
- set->latch->dirty = 1;
+ set->page->fence = slotptr(set->page, set->page->cnt)->off;
// cache new fence value
- ptr = keyptr(set->page, set->page->cnt);
+ ptr = fenceptr(set->page);
memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
return mgr->err_thread = thread_no, mgr->err;
bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
- bt_unpinlatch(set->latch, thread_no, __LINE__);
+ bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
return 0;
}
bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
memcpy (root->page, child->page, mgr->page_size);
- root->latch->dirty = 1;
-
bt_freepage (mgr, child, thread_no);
} while( root->page->lvl > 1 && root->page->act == 1 );
bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
- bt_unpinlatch (root->latch, thread_no, __LINE__);
+ bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
return 0;
}
BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
{
unsigned char lowerfence[BT_keyarray];
-uint page_size = mgr->page_size;
+uint page_size = mgr->page_size, kill;
BtPageSet right[1], temp[1];
unsigned char value[BtId];
uid page_no, right2;
if( !lvl )
page_size <<= mgr->leaf_xtra;
- // cache copy of original fence key
- // that is being deleted.
+ // cache original copy of original fence key
+ // that is going to be deleted.
- ptr = keyptr(set->page, set->page->cnt);
+ ptr = fenceptr(set->page);
memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
- // obtain locks on right page
+ // pin and lock our right page
- page_no = bt_getid(set->page->right);
+ page_no = set->page->right;
if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
right->page = bt_mappage (mgr, right->latch);
bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
- if( right->page->kill )
+ if( right->page->kill || set->page->kill )
return mgr->line = __LINE__, mgr->err = BTERR_struct;
- // pull contents of right peer into our empty page
+ // pull contents of right sibling over our empty page
// preserving our left page number, and its right page number.
bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
- page_no = bt_getid(set->page->left);
+ page_no = set->page->left;
memcpy (set->page, right->page, page_size);
- bt_putid (set->page->left, page_no);
+ set->page->left = page_no;
bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
- set->page->page_no = set->latch->page_no;
- set->latch->dirty = 1;
-
// fix left link from far right page
- if( right2 = bt_getid (right->page->right) ) {
+ if( right2 = set->page->right ) {
if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
temp->page = bt_mappage (mgr, temp->latch);
else
return 0;
+ bt_lockpage (BtLockAccess, temp->latch, thread_no, __LINE__);
bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
- bt_putid (temp->page->left, set->latch->page_no);
+ temp->page->left = set->latch->page_no;
bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
- bt_unpinlatch (temp->latch, thread_no, __LINE__);
+ bt_unlockpage(BtLockAccess, temp->latch, thread_no, __LINE__);
+ bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
} else if( !lvl ) { // our page is now rightmost leaf
bt_mutexlock (mgr->lock);
- bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
+ mgr->pagezero->alloc->left = set->latch->page_no;
bt_releasemutex(mgr->lock);
}
- // mark right page deleted and release lock
+ // mark right page as being deleted and release lock
- right->latch->dirty = 1;
right->page->kill = 1;
bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
- // redirect higher key directly to our new node contents
+ // redirect the new higher key directly to our new node
- ptr = keyptr(right->page, right->page->cnt);
+ ptr = fenceptr(set->page);
bt_putid (value, set->latch->page_no);
if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
return mgr->err;
- // delete our orignal fence key in parent
- // and unlock our page.
+ // delete our original fence key in parent
ptr = (BtKey *)lowerfence;
if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
return mgr->err;
- bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
-
- // obtain delete and write locks to right node
+ // wait for all access to drain away with delete lock,
+ // then obtain write lock to right node and free it.
bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
+ bt_lockpage (BtLockLink, right->latch, thread_no, __LINE__);
bt_freepage (mgr, right, thread_no);
+
+ // release write lock to our node
+
+ bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
return 0;
}
// did we delete a fence key in an upper level?
if( lvl && set->page->act && fence )
- if( bt_fixfence (mgr, set, lvl, thread_no) )
- return mgr->err_thread = thread_no, mgr->err;
- else
- return 0;
+ return bt_fixfence (mgr, set, lvl, thread_no);
// do we need to collapse root?
- if( set->latch->page_no == ROOT_page && set->page->act == 1 )
- if( bt_collapseroot (mgr, set, thread_no) )
- return mgr->err_thread = thread_no, mgr->err;
- else
- return 0;
+ if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
+ return bt_collapseroot (mgr, set, thread_no);
// delete empty page
if( !set->page->act )
return bt_deletepage (mgr, set, thread_no, set->page->lvl);
- set->latch->dirty = 1;
bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
return 0;
}
// skip page info and set rest of page to zero
memset (page+1, 0, page_size - sizeof(*page));
- set->latch->dirty = 1;
page->min = page_size;
page->garbage = 0;
page->act = 0;
// clean up page first by
- // removing deleted keys
+ // removing dead keys
while( cnt++ < max ) {
if( cnt == slot )
slotptr(page, idx)->type = slotptr(frame, cnt)->type;
if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
- page->act++;
+ page->act++;
}
+ page->fence = page->min;
page->cnt = idx;
free (frame);
// save left page fence key for new root
- ptr = keyptr(root->page, root->page->cnt);
+ ptr = fenceptr(root->page);
memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
// Obtain an empty page to use, and copy the current
return mgr->err_thread = thread_no, mgr->err;
left_page_no = left->latch->page_no;
- bt_unpinlatch (left->latch, thread_no, __LINE__);
+ bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
free (frame);
// left link the pages together
page = bt_mappage (mgr, right);
- bt_putid (page->left, left_page_no);
+ page->left = left_page_no;
// preserve the page info at the bottom
// of higher keys and set rest to zero
val->len = BtId;
nxt -= 2 + sizeof(BtKey);
+ page->fence = nxt;
+
slotptr(root->page, 2)->off = nxt;
ptr = (BtKey *)((unsigned char *)root->page + nxt);
ptr->len = 2;
slotptr(root->page, 1)->off = nxt;
memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
- bt_putid(root->page->right, 0);
+ root->page->right = 0;
root->page->min = nxt; // reset lowest used offset and key count
root->page->cnt = 2;
root->page->act = 2;
// release and unpin root pages
bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
- bt_unpinlatch (root->latch, thread_no, __LINE__);
+ bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
- bt_unpinlatch (right, thread_no, __LINE__);
+ bt_unpinlatch (right, 1, thread_no, __LINE__);
return 0;
}
slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
- frame->act++;
+ frame->act++;
}
+ frame->fence = frame->min;
frame->cnt = idx;
frame->lvl = lvl;
// link right node
if( set->latch->page_no > ROOT_page ) {
- right2 = bt_getid (set->page->right);
- bt_putid (frame->right, right2);
+ right2 = set->page->right;
+ frame->right = right2;
if( linkleft )
- bt_putid (frame->left, set->latch->page_no);
+ frame->left = set->latch->page_no;
}
// get new free page and write higher keys to it.
return 0;
bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
- bt_putid (temp->page->left, right->latch->page_no);
+ temp->page->left = right->latch->page_no;
bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
- bt_unpinlatch (temp->latch, thread_no, __LINE__);
+ bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
} else if( !lvl ) { // page is rightmost leaf
bt_mutexlock (mgr->lock);
- bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
+ mgr->pagezero->alloc->left = right->latch->page_no;
bt_releasemutex(mgr->lock);
}
memcpy (frame, set->page, page_size);
memset (set->page+1, 0, page_size - sizeof(*set->page));
- set->latch->dirty = 1;
set->page->min = page_size;
set->page->garbage = 0;
set->page->act++;
}
- bt_putid(set->page->right, right->latch->page_no);
+ set->page->right = right->latch->page_no;
+ set->page->fence = set->page->min;
set->page->cnt = idx;
free(frame);
if( set->latch->page_no == ROOT_page )
return bt_splitroot (mgr, set, right, thread_no);
- ptr = keyptr(set->page, set->page->cnt);
+ ptr = fenceptr(set->page);
memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
page = bt_mappage (mgr, right);
- ptr = keyptr(page, page->cnt);
+ ptr = fenceptr(page);
memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
// splice in far right page's left page_no
- if( right2 = bt_getid (page->right) ) {
+ if( right2 = page->right ) {
if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
temp->page = bt_mappage (mgr, temp->latch);
else
return 0;
bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
- bt_putid (temp->page->left, right->page_no);
+ temp->page->left = right->page_no;
bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
- bt_unpinlatch (temp->latch, thread_no, __LINE__);
+ bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
} else if( !lvl ) { // right page is far right page
bt_mutexlock (mgr->lock);
- bt_putid (mgr->pagezero->alloc->left, right->page_no);
+ mgr->pagezero->alloc->left = right->page_no;
bt_releasemutex(mgr->lock);
}
// insert new fences in their parent pages
return mgr->err_thread = thread_no, mgr->err;
bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
- bt_unpinlatch (right, thread_no, __LINE__);
+ bt_unpinlatch (right, 1, thread_no, __LINE__);
return 0;
}
--idx;
}
- set->latch->dirty = 1;
set->page->act++;
// fill in new slot
node->dead = 0;
set->page->garbage += val->len - vallen;
- set->latch->dirty = 1;
val->len = vallen;
memcpy (val->value, value, vallen);
goto insxit;
memcpy (val->value, value, vallen);
val->len = vallen;
- set->latch->dirty = 1;
set->page->min -= keylen + sizeof(BtKey);
ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
memcpy (ptr->key, key, keylen);
insxit:
bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
return 0;
}
// determine actual page where key is located
// return slot number
-uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
+uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, BtPageSet *set)
{
-BtKey *key = keyptr(source,src), *ptr;
-uint slot = locks[src].slot;
+BtKey *key = keyptr(source,locks[idx].src), *ptr;
+uint slot = locks[idx].slot;
uint entry;
- if( locks[src].reuse )
- entry = locks[src-1].entry;
+ if( locks[idx].reuse )
+ entry = locks[idx-1].entry;
else
- entry = locks[src].entry;
+ entry = locks[idx].entry;
if( slot ) {
set->latch = mgr->leafsets + entry;
if( slot = bt_findslot(set->page, key->key, key->len) ) {
if( slotptr(set->page, slot)->type == Librarian )
slot++;
- if( locks[src].reuse )
- locks[src].entry = entry;
+ if( locks[idx].reuse )
+ locks[idx].entry = entry;
return slot;
}
} while( entry = set->latch->split );
return 0;
}
-BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
+BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
{
-BtKey *key = keyptr(source, src);
-BtVal *val = valptr(source, src);
+BtKey *key = keyptr(source, locks[idx].src);
+BtVal *val = valptr(source, locks[idx].src);
BtLatchSet *latch;
BtPageSet set[1];
uint entry, slot;
- while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
+ while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
- if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
+ if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,locks[idx].src)->type) )
return mgr->err_thread = thread_no, mgr->err;
set->page->lsn = lsn;
// clear slot number for atomic page
- locks[src].slot = 0;
+ locks[idx].slot = 0;
}
return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
// perform delete from smaller btree
// insert a delete slot if not found there
-BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
+BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
{
-BtKey *key = keyptr(source, src);
+BtKey *key = keyptr(source, locks[idx].src);
+BtLatchSet *latch;
+uint slot, entry;
BtPageSet set[1];
-uint idx, slot;
BtSlot *node;
BtKey *ptr;
BtVal *val;
- if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
- node = slotptr(set->page, slot);
- ptr = keyptr(set->page, slot);
- val = valptr(set->page, slot);
- } else
- return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
+ while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
+ node = slotptr(set->page, slot);
+ ptr = keyptr(set->page, slot);
+ val = valptr(set->page, slot);
- // if slot is not found, insert a delete slot
+ // if slot is not found on cache btree, insert a delete slot
+ // otherwise ignore the request.
if( keycmp (ptr, key->key, key->len) )
- if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
- return mgr->err;
+ if( !mgr->type )
+ if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
+ return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
+ else { // split page before inserting Delete slot
+ if( entry = bt_splitpage (mgr, set, thread_no, 0) )
+ latch = mgr->leafsets + entry;
+ else
+ return mgr->err;
+
+ // splice right page into split chain
+ // and WriteLock it
+
+ bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
+ latch->split = set->latch->split;
+ set->latch->split = entry;
+
+ // clear slot number for atomic page
+
+ locks[idx].slot = 0;
+ continue;
+ }
+ else
+ return 0;
// if node is already dead,
// ignore the request.
- if( node->dead )
+ if( node->type == Delete || node->dead )
return 0;
- set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
- set->latch->dirty = 1;
- set->page->lsn = lsn;
- set->page->act--;
+ // if main LSM btree, delete the slot
+ // else change to delete type.
+
+ if( mgr->type ) {
+ set->page->act--;
+ node->dead = 1;
+ } else
+ node->type = Delete;
- node->dead = 0;
__sync_fetch_and_add(&mgr->found, 1);
+ set->page->lsn = lsn;
return 0;
+ }
+
+ return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
}
// release master's splits from right to left
latch->split = 0;
bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
- bt_unpinlatch(latch, thread_no, __LINE__);
+ bt_unpinlatch(latch, 1, thread_no, __LINE__);
}
int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
// perform the individual actions in the transaction
- if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
+ if( bt_atomicexec (bt->mgr, source, source->cnt, lsn, 0, bt->thread_no) )
return bt->mgr->err;
// if number of active pages
// execute the source list of inserts/deletes
-BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no)
{
-uint slot, src, idx, samepage, entry;
+uint slot, src, idx, samepage, entry, outidx;
BtPageSet set[1], prev[1];
unsigned char value[BtId];
BtLatchSet *latch;
BtPage page;
BtVal *val;
- locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
+ locks = calloc (count, sizeof(AtomicTxn));
+ memset (set, 0, sizeof(BtPageSet));
+ outidx = 0;
// Load the leaf page for each key
// group same page references with reuse bit
- for( src = 0; src++ < source->cnt; ) {
+ for( src = 0; src++ < count; ) {
+ if( slotptr(source,src)->dead )
+ continue;
+
key = keyptr(source, src);
// first determine if this modification falls
// on the same page as the previous modification
// note that the far right leaf page is a special case
- if( samepage = src > 1 )
- samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
+ if( samepage = !!set->page )
+ samepage = !set->page->right || keycmp (ptr, key->key, key->len) >= 0;
if( !samepage )
if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
- ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
+ ptr = fenceptr(set->page), set->latch->split = 0;
else
return mgr->err;
else
slot++;
entry = set->latch - mgr->leafsets;
- locks[src].reuse = samepage;
- locks[src].entry = entry;
- locks[src].slot = slot;
+ locks[outidx].reuse = samepage;
+ locks[outidx].entry = entry;
+ locks[outidx].slot = slot;
+ locks[outidx].src = src;
// capture current lsn for master page
- locks[src].reqlsn = set->page->lsn;
+ locks[outidx++].reqlsn = set->page->lsn;
}
// insert or delete each key
// process any splits or merges
// run through txn list backwards
- samepage = source->cnt + 1;
+ samepage = outidx;
- for( src = source->cnt; src; src-- ) {
+ for( src = outidx; src--; ) {
if( locks[src].reuse )
continue;
// the same page
for( idx = src; idx < samepage; idx++ )
- switch( slotptr(source,idx)->type ) {
+ switch( slotptr(source,locks[idx].src)->type ) {
case Delete:
if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
return mgr->err;
// note that there are no pointers to it yet
if( !prev->page->act ) {
- memcpy (set->page->left, prev->page->left, BtId);
+ set->page->left = prev->page->left;
memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
+ bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
prev->latch->split = set->latch->split;
- prev->latch->dirty = 1;
bt_freepage (mgr, set, thread_no);
continue;
}
// thread has its page number yet.
if( !set->page->act ) {
- memcpy (prev->page->right, set->page->right, BtId);
+ prev->page->right = set->page->right;
prev->latch->split = set->latch->split;
bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
+ bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
bt_freepage (mgr, set, thread_no);
continue;
}
// update prev's fence key
- ptr = keyptr(prev->page,prev->page->cnt);
+ ptr = fenceptr(prev->page);
bt_putid (value, prev->latch->page_no);
if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
// splice in the left link into the split page
- bt_putid (set->page->left, prev->latch->page_no);
-
- if( lsm )
- bt_syncpage (mgr, prev->page, prev->latch);
-
+ set->page->left = prev->latch->page_no;
*prev = *set;
}
// fix left pointer in master's original (now split)
// far right sibling or set rightmost page in page zero
- if( right_page_no = bt_getid (prev->page->right) ) {
+ if( right_page_no = prev->page->right ) {
if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
set->page = bt_mappage (mgr, set->latch);
else
return mgr->err;
bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
- bt_putid (set->page->left, prev->latch->page_no);
- set->latch->dirty = 1;
-
+ set->page->left = prev->latch->page_no;
bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
} else { // prev is rightmost page
bt_mutexlock (mgr->lock);
- bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
+ mgr->pagezero->alloc->left = prev->latch->page_no;
bt_releasemutex(mgr->lock);
}
// switch the original fence key from the
// master page to the last split page.
- ptr = keyptr(prev->page,prev->page->cnt);
+ ptr = fenceptr(prev->page);
bt_putid (value, prev->latch->page_no);
if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
return mgr->err;
- if( lsm )
- bt_syncpage (mgr, prev->page, prev->latch);
-
// unlock and unpin the split pages
bt_atomicrelease (mgr, latch->split, thread_no);
latch->split = 0;
bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
- bt_unpinlatch(latch, thread_no, __LINE__);
+ bt_unpinlatch(latch, 1, thread_no, __LINE__);
continue;
}
if( prev->page->act ) {
bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
-
- if( lsm )
- bt_syncpage (mgr, prev->page, prev->latch);
-
- bt_unpinlatch(prev->latch, thread_no, __LINE__);
+ bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
continue;
}
return mgr->err;
}
+ // delete the slots
+
+ for( idx = 0; idx++ < count; ) {
+ if( slotptr(source,idx)->dead )
+ continue;
+
+ slotptr(source,idx)->dead = 1;
+ source->act--;
+ }
+
free (locks);
return 0;
}
// entry has no right sibling
- if( !bt_getid (set->page->right) ) {
+ if( !set->page->right ) {
bt_releasemutex(set->latch->modify);
continue;
}
- // entry interiour node or being killed or constructed
+ // entry is being killed or constructed
- if( set->page->lvl || set->page->nopromote || set->page->kill ) {
+ if( set->page->nopromote || set->page->kill ) {
bt_releasemutex(set->latch->modify);
continue;
}
bt_releasemutex(set->latch->modify);
// transfer slots in our selected page to the main btree
+
if( !(entry % 100) )
fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
- if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
+ if( bt_atomicexec (bt->main, set->page, set->page->cnt, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
return bt->main->err;
}
// or the key doesn't match what's on the page.
if( slot == set->page->cnt )
- if( !bt_getid (set->page->right) ) {
+ if( !set->page->right ) {
bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
continue;
}
if( keycmp (ptr, key, keylen) ) {
bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
continue;
}
findxit:
if( type < 2 ) {
bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
}
return ret;
}
-// set cursor to highest slot on left-most page
+// set cursor to highest slot on right-most page
BTERR bt_lastkey (BtDb *bt)
{
-uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
-uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
+uid cache_page_no = bt->mgr->pagezero->alloc->left;
+uid main_page_no = bt->main->pagezero->alloc->left;
if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
else
return slot;
- next = bt_getid(set->page->left);
+ next = set->page->left;
if( !next )
return 0;
do {
bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
if( set->latch = bt_pinleaf (mgr, next, thread_no) )
set->page = bt_mappage (mgr, set->latch);
return 0;
bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
- next = bt_getid (set->page->right);
+ next = set->page->right;
} while( next != us );
while( slot++ < set->page->cnt )
if( slotptr(set->page, slot)->dead )
continue;
- else if( slot < set->page->cnt || bt_getid (set->page->right) )
+ else if( slot < set->page->cnt || set->page->right )
return slot;
else
return 0;
bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
- if( page_no = bt_getid(set->page->right) )
+ if( page_no = set->page->right )
if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
set->page = bt_mappage (mgr, set->latch);
else
return 0;
// main key is larger
+ // return smaller key
if( cmp < 0 ) {
bt->phase = 0;
return 0;
}
+// flush cache pages to main btree
+
+BTERR bt_flushmain (BtDb *bt)
+{
+uint count, cnt = 0;
+BtPageSet set[1];
+
+ while( bt->mgr->pagezero->leafpages > 0 ) {
+ if( set->latch = bt_pinleaf (bt->mgr, LEAF_page, bt->thread_no) )
+ set->page = bt_mappage (bt->mgr, set->latch);
+ else
+ return bt->mgr->err;
+
+ bt_lockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
+ count = set->page->cnt;
+
+ if( !set->page->right )
+ count--;
+
+if( !(cnt++ % 100) )
+fprintf(stderr, "Promote LEAF_page %d with %d keys\n", cnt, set->page->act);
+
+ if( bt_atomicexec (bt->main, set->page, count, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
+ return bt->mgr->line = bt->main->line, bt->mgr->err = bt->main->err;
+
+ if( set->page->right )
+ if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
+ return bt->mgr->err;
+ else
+ continue;
+
+ bt_unlockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
+ return 0;
+ }
+
+ // leaf page count is off
+
+ bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__;
+ return bt->mgr->err = BTERR_ovflw;
+}
+
#ifdef STANDALONE
#ifndef unix
for( entry = 0; ++entry < mgr->latchtotal; ) {
latch = mgr->latchsets + entry;
+ if( latch->leaf )
+ fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
+
if( *latch->modify->value )
fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
for( entry = 0; ++entry < mgr->leaftotal; ) {
latch = mgr->leafsets + entry;
+ if( !latch->leaf )
+ fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
+
if( *latch->modify->value )
fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
unsigned char buff[65536];
uint nxt = sizeof(buff);
ThreadArg *args = arg;
-BtPage page, frame;
+uint counts[8][2];
BtPageSet set[1];
+BtPage page;
int vallen;
BtKey *ptr;
BtVal *val;
+uint size;
BtDb *bt;
FILE *in;
switch(ch | 0x20)
{
+ case 'm':
+ fprintf(stderr, "started flushing cache to main btree\n");
+
+ if( bt->main )
+ if( bt_flushmain(bt) )
+ fprintf(stderr, "Error %d Line: %d thread: %d\n", bt->mgr->err, bt->mgr->line, bt->thread_no), exit(0);
+
+ break;
+
case 'd':
type = Delete;
buff[nxt] = 10;
slotptr(page,++cnt)->off = nxt;
slotptr(page,cnt)->type = type;
+ slotptr(page,cnt)->dead = 0;
len = 0;
if( cnt < args->num )
}
bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
fprintf(stderr, " Total keys read %d\n", cnt);
break;
}
bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
fprintf(stderr, " Total keys read %d\n", cnt);
break;
case 'c':
fprintf(stderr, "started counting LSM cache btree\n");
+ next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
+ memset (counts, 0, sizeof(counts));
page_no = LEAF_page;
+ size = bt->mgr->page_size << bt->mgr->leaf_xtra;
+ page = malloc(size);
- do {
- if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
+#ifdef unix
+ posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
+#endif
+ while( page_no < bt->mgr->pagezero->alloc->right ) {
+ pread(bt->mgr->idx, page, size, page_no << bt->mgr->page_bits);
+ if( !page->lvl && !page->free ) {
+ cnt += page->act;
+
+ for( idx = 0; idx++ < page->cnt; ) {
+ BtSlot *node = slotptr (page, idx);
+ counts[node->type][node->dead]++;
+ }
+ }
+ if( next )
+ page_no = next;
else
- return 0;
-
- bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- cnt += set->page->act;
- page_no = bt_getid (set->page->right);
- bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
- } while( page_no );
+ page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
+ next = 0;
+ }
cachecnt = --cnt; // remove stopper key
- cnt = 0;
+ counts[Unique][0]--;
+
+ fprintf(stderr, " Unique : %d dead: %d\n", counts[Unique][0], counts[Unique][1]);
+ fprintf(stderr, " Duplicates: %d dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
+ fprintf(stderr, " Librarian : %d dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
+ fprintf(stderr, " Deletion : %d dead: %d\n", counts[Delete][0], counts[Delete][1]);
+ fprintf(stderr, "total cache keys count: %d\n", cachecnt);
+ free (page);
fprintf(stderr, "started counting LSM main btree\n");
+ next = bt->main->redopage + bt->main->pagezero->redopages;
+ memset (counts, 0, sizeof(counts));
+ size = bt->main->page_size << bt->main->leaf_xtra;
+ page = malloc(size);
page_no = LEAF_page;
+ cnt = 0;
- do {
- if( set->latch = bt_pinleaf (bt->main, page_no, bt->thread_no) )
- set->page = bt_mappage (bt->main, set->latch);
+#ifdef unix
+ posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
+#endif
+ while( page_no < bt->main->pagezero->alloc->right ) {
+ pread(bt->main->idx, page, size, page_no << bt->main->page_bits);
+ if( !page->lvl && !page->free ) {
+ cnt += page->act;
+
+ for( idx = 0; idx++ < page->cnt; ) {
+ BtSlot *node = slotptr (page, idx);
+ counts[node->type][node->dead]++;
+ }
+ }
+ if( next )
+ page_no = next;
else
- return 0;
-
- bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- cnt += set->page->act;
- page_no = bt_getid (set->page->right);
- bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
- } while( page_no );
+ page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
+ next = 0;
+ }
cnt--; // remove stopper key
-
- fprintf(stderr, " cache keys counted %d\n", cachecnt);
- fprintf(stderr, " main keys counted %d\n", cnt);
- fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
+ counts[Unique][0]--;
+
+ fprintf(stderr, " Unique : %d dead: %d\n", counts[Unique][0], counts[Unique][1]);
+ fprintf(stderr, " Duplicates: %d dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
+ fprintf(stderr, " Librarian : %d dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
+ fprintf(stderr, " Deletion : %d dead: %d\n", counts[Delete][0], counts[Delete][1]);
+ fprintf(stderr, "total main keys count : %d\n", cnt);
+ fprintf(stderr, "Total keys counted : %d\n", cnt + cachecnt);
+ free (page);
break;
}
fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
fprintf (stderr, " where idx_file is the name of the cache btree file\n");
fprintf (stderr, " where main_file is the name of the main btree file\n");
- fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
+ fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort/(c)ount/(m)ainflush, with a one character command for each input src_file. A command can also be given with no input file\n");
fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
if( !mgr ) {
fprintf(stderr, "Index Open Error %s\n", argv[1]);
exit (1);
- }
+ } else
+ mgr->type = 0;
if( mainbits ) {
main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
if( !main ) {
fprintf(stderr, "Index Open Error %s\n", argv[2]);
exit (1);
- }
+ } else
+ main->type = 1;
} else
main = NULL;
if( main )
fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
+ bt_mgrclose (mgr);
+
if( main )
bt_mgrclose (main);
- bt_mgrclose (mgr);
elapsed = getCpuTime(0) - start;
fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
}
+BtKey *bt_fence (BtPage page)
+{
+return fenceptr(page);
+}
+
BtKey *bt_key (BtPage page, uint slot)
{
return keyptr(page,slot);