-// btree version threadskv10e futex version
+// btree version threadskv10f futex version
// with reworked bt_deletekey code,
// phase-fair re-entrant reader writer lock,
// librarian page split code,
// ACID batched key-value updates
// redo log for failure recovery
// LSM B-trees for write optimization
-// and larger sized leaf pages than non-leaf
+// larger sized leaf pages than non-leaf
+// and LSM B-tree find operations
-// 24 OCT 2014
+// 28 OCT 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
typedef struct {
MutexLatch xcl[1];
MutexLatch wrt[1];
- uint readers;
+ ushort readers;
ushort dup; // re-entrant locks
ushort tid; // owner thread-no
- uint line; // owner line #
+ ushort line; // owner line #
} RWLock;
// hash table entries
typedef struct {
uid page_no; // latch set page number
- MutexLatch modify[1]; // modify entry lite latch
RWLock readwr[1]; // read/write page lock
RWLock access[1]; // Access Intent/Page delete
RWLock parent[1]; // Posting of fence key in parent
RWLock link[1]; // left link update in progress
+ MutexLatch modify[1]; // modify entry lite latch
uint split; // right split page atomic insert
uint next; // next entry in hash table chain
uint prev; // prev entry in hash table chain
unsigned char freechain[BtId]; // head of free page_nos chain
unsigned char leafchain[BtId]; // head of leaf page_nos chain
unsigned long long leafpages; // number of active leaf pages
+ unsigned long long upperpages; // number of active upper pages
uint redopages; // number of redo pages in file
unsigned char leaf_xtra; // leaf page size in xtra bits
unsigned char page_bits; // base page size in bits
typedef struct {
BtMgr *mgr; // buffer manager for entire process
BtMgr *main; // buffer manager for main btree
- BtPage cursor; // cached page frame for start/next
+ BtPage cachecursor; // cached page frame for cache btree
+ BtPage maincursor; // cached page frame for main btree
ushort thread_no; // thread number
unsigned char key[BT_keyarray]; // last found complete key
} BtDb;
logseqno reqlsn; // redo log seq no required
uint entry:31; // latch table entry number
uint reuse:1; // reused previous page
+ uint slot; // slot on page
} AtomicTxn;
// Catastrophic errors
extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
extern uint bt_nextkey (BtDb *bt, uint slot);
-extern uint bt_prevkey (BtDb *db, uint slot);
-extern uint bt_lastkey (BtDb *db);
+extern uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no);
+extern uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no);
// manager functions
extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
// atomic transaction functions
BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
-BTERR bt_txnpromote (BtDb *bt);
+BTERR bt_promote (BtDb *bt);
// The page is allocated from low and hi ends.
// The key slots are allocated from the bottom,
// Access macros to address slot and key values from the page
// Page slots use 1 based indexing.
-#define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
+#define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
#define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
#define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
void bt_releasemutex(MutexLatch *latch)
{
- *latch->bits->xcl = 0;
+MutexLatch prev[1];
- if( *latch->bits->waiters )
+ *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
+
+ if( *prev->bits->waiters )
sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
}
void bt_close (BtDb *bt)
{
#ifdef unix
- if( bt->cursor )
- free (bt->cursor);
+ if( bt->cachecursor )
+ free (bt->cachecursor);
+ if( bt->maincursor )
+ free (bt->maincursor);
#else
- if( bt->cursor)
- VirtualFree (bt->cursor, 0, MEM_RELEASE);
+ if( bt->cachecursor)
+ VirtualFree (bt->cachecursor, 0, MEM_RELEASE);
+ if( bt->maincursor)
+ VirtualFree (bt->maincursor, 0, MEM_RELEASE);
#endif
free (bt);
}
pagezero->leaf_xtra = leafxtra;
bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
+ pagezero->upperpages = 1;
pagezero->leafpages = 1;
// initialize left-most LEAF page in
node->off <<= mgr->leaf_xtra;
node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
+ node->type = Librarian;
+ node++->dead = 1;
+
+ node->off = node[-1].off;
+ key = keyptr(pagezero->alloc, 2);
key = keyptr(pagezero->alloc, 1);
key->len = 2; // create stopper key
key->key[0] = 0xff;
pagezero->alloc->min = node->off;
pagezero->alloc->lvl = lvl;
- pagezero->alloc->cnt = 1;
+ pagezero->alloc->cnt = 2;
pagezero->alloc->act = 1;
pagezero->alloc->page_no = MIN_lvl - lvl;
bt->main = main;
bt->mgr = mgr;
#ifdef unix
- bt->cursor = valloc (mgr->page_size << mgr->leaf_xtra);
+ bt->cachecursor = valloc (mgr->page_size << mgr->leaf_xtra);
+ bt->maincursor = valloc (mgr->page_size << mgr->leaf_xtra);
#else
- bt->cursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
+ bt->cachecursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
+ bt->maincursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
#endif
#ifdef unix
bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
ReadLock (latch->readwr, thread_no);
break;
case BtLockWrite:
-//if(latch->leaf)
-//fprintf(stderr, "WrtRqst %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
WriteLock (latch->readwr, thread_no, line);
-//if(latch->leaf)
-//fprintf(stderr, "WrtLock %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
break;
case BtLockAccess:
ReadLock (latch->access, thread_no);
ReadRelease (latch->readwr);
break;
case BtLockWrite:
-//if(latch->leaf)
-//fprintf(stderr, "Un Lock %d by %d at %d\n", (uint)latch->page_no, thread_no, line);
WriteRelease (latch->readwr);
break;
case BtLockAccess:
unsigned char *freechain;
uid page_no;
- if( contents->lvl )
+ if( contents->lvl ) {
freechain = mgr->pagezero->freechain;
- else {
+ mgr->pagezero->upperpages++;
+ } else {
freechain = mgr->pagezero->leafchain;
mgr->pagezero->leafpages++;
page_xtra = mgr->leaf_xtra;
bt_putid(freechain, bt_getid(set->page->right));
// the page is currently nopromote and this
- // will keep bt_txnpromote out.
+ // will keep bt_promote out.
// contents will replace this bit
- // and pin will keep bt_txnpromote out
+ // and pin will keep bt_promote out
contents->page_no = page_no;
contents->nopromote = 0;
// fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
bt_releasemutex(mgr->lock);
- // keep bt_txnpromote out of this page
+ // keep bt_promote out of this page
contents->nopromote = 1;
contents->page_no = page_no;
if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
else
return mgr->err_thread = thread_no, mgr->err;
- // now pin will keep bt_txnpromote out
+ // now pin will keep bt_promote out
set->page->nopromote = 0;
set->latch->dirty = 1;
{
unsigned char *freechain;
- if( set->page->lvl )
+ if( set->page->lvl ) {
freechain = mgr->pagezero->freechain;
- else {
+ mgr->pagezero->upperpages--;
+ } else {
freechain = mgr->pagezero->leafchain;
mgr->pagezero->leafpages--;
}
return mgr->line = __LINE__, mgr->err = BTERR_struct;
// pull contents of right peer into our empty page
- // preserving our left page number.
+ // preserving our left page number, and its right page number.
bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
page_no = bt_getid(set->page->left);
set->page->page_no = set->latch->page_no;
set->latch->dirty = 1;
+ // fix left link from far right page
+
if( right2 = bt_getid (right->page->right) ) {
if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
temp->page = bt_mappage (mgr, temp->latch);
bt_putid (temp->page->left, set->latch->page_no);
bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
bt_unpinlatch (temp->latch, thread_no, __LINE__);
- } else { // page is rightmost
+ } else { // our page is now rightmost
bt_mutexlock (mgr->lock);
bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
bt_releasemutex(mgr->lock);
right->latch->dirty = 1;
right->page->kill = 1;
+ bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
// redirect higher key directly to our new node contents
if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
return mgr->err;
- bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
-
- // delete orignal fence key in parent
+ // delete our orignal fence key in parent
// and unlock our page.
ptr = (BtKey *)lowerfence;
return 0;
}
-// advance to next slot
-
-uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
-{
-BtLatchSet *prevlatch;
-uid page_no;
-
- if( slot < set->page->cnt )
- return slot + 1;
-
- prevlatch = set->latch;
-
- if( page_no = bt_getid(set->page->right) )
- if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
- else
- return 0;
- else
- return bt->mgr->err = BTERR_struct, bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__, 0;
-
- // obtain access lock using lock chaining with Access mode
-
- bt_lockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
-
- bt_unlockpage(BtLockRead, prevlatch, bt->thread_no, __LINE__);
- bt_unpinlatch (prevlatch, bt->thread_no, __LINE__);
-
- bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unlockpage(BtLockAccess, set->latch, bt->thread_no, __LINE__);
- return 1;
-}
-
-// find unique key == given key, or first duplicate key in
-// leaf level and return number of value bytes
-// or (-1) if not found.
-
-int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
-{
-BtPageSet set[1];
-uint len, slot;
-int ret = -1;
-BtKey *ptr;
-BtVal *val;
-
- if( slot = bt_loadpage (bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) )
- do {
- ptr = keyptr(set->page, slot);
-
- // skip librarian slot place holder
-
- if( slotptr(set->page, slot)->type == Librarian )
- ptr = keyptr(set->page, ++slot);
-
- // return actual key found
-
- memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
- len = ptr->len;
-
- if( slotptr(set->page, slot)->type == Duplicate )
- len -= BtId;
-
- // not there if we reach the stopper key
-
- if( slot == set->page->cnt )
- if( !bt_getid (set->page->right) )
- break;
-
- // if key exists, return >= 0 value bytes copied
- // otherwise return (-1)
-
- if( slotptr(set->page, slot)->dead )
- continue;
-
- if( keylen == len )
- if( !memcmp (ptr->key, key, len) ) {
- val = valptr (set->page,slot);
- if( valmax > val->len )
- valmax = val->len;
- memcpy (value, val->value, valmax);
- ret = valmax;
- }
-
- break;
-
- } while( slot = bt_findnext (bt, set, slot) );
-
- bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
- return ret;
-}
-
// check page for space available,
// clean if necessary and return
// 0 - page needs splitting
unsigned char leftkey[BT_keyarray];
uint nxt = mgr->page_size;
unsigned char value[BtId];
+BtPage frame, page;
BtPageSet left[1];
uid left_page_no;
-BtPage frame;
BtKey *ptr;
BtVal *val;
bt_unpinlatch (left->latch, thread_no, __LINE__);
free (frame);
+ // left link the pages together
+
+ page = bt_mappage (mgr, right);
+ bt_putid (page->left, left_page_no);
+
// preserve the page info at the bottom
// of higher keys and set rest to zero
BtPageSet right[1], temp[1];
uint cnt = 0, idx = 0, max;
uint lvl = set->page->lvl;
-BtKey *key, *ptr;
-BtVal *val, *src;
BtPage frame;
+BtKey *key;
+BtVal *val;
uid right2;
uint entry;
uint prev;
if( slotptr(set->page, cnt)->dead )
continue;
- src = valptr(set->page, cnt);
- frame->min -= src->len + sizeof(BtVal);
- memcpy ((unsigned char *)frame + frame->min, src, src->len + sizeof(BtVal));
+ val = valptr(set->page, cnt);
+ frame->min -= val->len + sizeof(BtVal);
+ memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
key = keyptr(set->page, cnt);
frame->min -= key->len + sizeof(BtKey);
- ptr = (BtKey*)((unsigned char *)frame + frame->min);
- memcpy (ptr, key, key->len + sizeof(BtKey));
+ memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
// add librarian slot
if( set->latch->page_no > ROOT_page ) {
right2 = bt_getid (set->page->right);
bt_putid (frame->right, right2);
- bt_putid (frame->left, set->latch->page_no);
+
+ if( linkleft )
+ bt_putid (frame->left, set->latch->page_no);
}
// get new free page and write higher keys to it.
unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
unsigned char value[BtId];
uint lvl = set->page->lvl;
+BtPageSet temp[1];
BtPage page;
BtKey *ptr;
+uid right2;
// if current page is the root page, split it
ptr = keyptr(page, page->cnt);
memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
+ // splice in far right page's left page_no
+
+ if( right2 = bt_getid (page->right) ) {
+ if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
+ temp->page = bt_mappage (mgr, temp->latch);
+ else
+ return 0;
+
+ bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+ bt_putid (temp->page->left, right->page_no);
+ bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+ bt_unpinlatch (temp->latch, thread_no, __LINE__);
+ } else { // right page is far right page
+ bt_mutexlock (mgr->lock);
+ bt_putid (mgr->pagezero->alloc->left, right->page_no);
+ bt_releasemutex(mgr->lock);
+ }
// insert new fences in their parent pages
bt_lockpage (BtLockParent, right, thread_no, __LINE__);
memcpy (ptr->key, key, keylen);
ptr->len = keylen;
- // find first empty slot
+ // find first empty slot at or above our insert slot
for( idx = slot; idx < set->page->cnt; idx++ )
if( slotptr(set->page, idx)->dead )
break;
- // now insert key into array before slot,
- // adding as many librarian slots as
+ // now insert key into array before slot.
+
+ // if we're going all the way to the top,
+ // add as many librarian slots as
// makes sense.
if( idx == set->page->cnt ) {
} else
librarian = 0, rate = 0;
+ // transfer slots and add librarian slots
+
while( idx > slot ) {
- // transfer slot
*slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
- idx--;
// add librarian slot per rate
if( librarian )
- if( (idx - slot + 1)/2 <= librarian * rate ) {
- node = slotptr(set->page, idx--);
+ if( (idx - slot)/2 <= librarian * rate ) {
+ node = slotptr(set->page, --idx);
node->off = node[1].off;
node->type = Librarian;
node->dead = 1;
librarian--;
}
+
+ --idx;
}
set->latch->dirty = 1;
if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
node = slotptr(set->page, slot);
ptr = keyptr(set->page, slot);
- } else {
- if( !mgr->err )
- mgr->line = __LINE__, mgr->err = BTERR_ovflw;
- return mgr->err_thread = thread_no, mgr->err;
- }
+ } else
+ return mgr->err;
// if librarian slot == found slot, advance to real slot
if( node->type == Librarian ) {
-// if( !keycmp (ptr, key, keylen) ) {
- ptr = keyptr(set->page, ++slot);
- node = slotptr(set->page, slot);
- }
+ node = slotptr(set->page, ++slot);
+ ptr = keyptr(set->page, slot);
+ }
// if inserting a duplicate key or unique
// key that doesn't exist on the page,
goto insxit;
if( entry = bt_splitpage (mgr, set, thread_no, 1) )
- if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+ if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
continue;
return mgr->err_thread = thread_no, mgr->err;
break;
if( entry = bt_splitpage (mgr, set, thread_no, 1) )
- if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+ if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
continue;
return mgr->err_thread = thread_no, mgr->err;
uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
{
BtKey *key = keyptr(source,src), *ptr;
-unsigned char fence[BT_keyarray];
-uint entry, slot;
+uint slot = locks[src].slot;
+uint entry;
if( locks[src].reuse )
entry = locks[src-1].entry;
else
entry = locks[src].entry;
+ if( slot ) {
+ set->latch = mgr->leafsets + entry;
+ set->page = bt_mappage (mgr, set->latch);
+ return slot;
+ }
+
// find where our key is located
// on current page or pages split on
// same page txn operations.
}
} while( entry = set->latch->split );
- ptr = keyptr (set->page, set->page->cnt);
- memcpy (fence, ptr, ptr->len + 1);
-
mgr->line = __LINE__, mgr->err = BTERR_atomic;
return 0;
}
if( entry = bt_splitpage (mgr, set, thread_no, 0) )
latch = mgr->leafsets + entry;
else
- return mgr->err_thread = thread_no, mgr->err;
+ return mgr->err;
// splice right page into split chain
// and WriteLock it
bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
latch->split = set->latch->split;
set->latch->split = entry;
+
+ // clear slot number for atomic page
+
+ locks[src].slot = 0;
}
return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
if( bt->main )
while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
- if( bt_txnpromote (bt) )
+ if( bt_promote (bt) )
return bt->mgr->err;
// return success
BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
{
-uint src, idx, samepage, entry;
+uint slot, src, idx, samepage, entry;
BtPageSet set[1], prev[1];
unsigned char value[BtId];
BtLatchSet *latch;
samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
if( !samepage )
- if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
+ if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
else
return mgr->err;
+ else
+ slot = 0;
+
+ if( slot )
+ if( slotptr(set->page, slot)->type == Librarian )
+ slot++;
entry = set->latch - mgr->leafsets;
locks[src].reuse = samepage;
locks[src].entry = entry;
+ locks[src].slot = slot;
// capture current lsn for master page
// pick & promote a page into the larger btree
-BTERR bt_txnpromote (BtDb *bt)
+BTERR bt_promote (BtDb *bt)
{
uint entry, slot, idx;
BtPageSet set[1];
}
}
+// find unique key == given key, or first duplicate key in
+// leaf level and return number of value bytes
+// or (-1) if not found.
+
+int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
+{
+int ret = -1, type;
+BtPageSet set[1];
+BtSlot *node;
+BtKey *ptr;
+BtVal *val;
+uint slot;
+
+ for( type = 0; type < 2; type++ )
+ if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
+ node = slotptr(set->page, slot);
+
+ // skip librarian slot place holder
+
+ if( node->type == Librarian )
+ node = slotptr(set->page, ++slot);
+
+ ptr = keyptr(set->page, slot);
+
+ // not there if we reach the stopper key
+ // or the key doesn't match what's on the page.
+
+ if( slot == set->page->cnt )
+ if( !bt_getid (set->page->right) ) {
+ bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ continue;
+ }
+
+ if( keycmp (ptr, key, keylen) ) {
+ bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ continue;
+ }
+
+ // key matches, return >= 0 value bytes copied
+ // or -1 if not there.
+
+ if( node->type == Delete || node->dead ) {
+ ret = -1;
+ goto findxit;
+ }
+
+ val = valptr (set->page,slot);
+
+ if( valmax > val->len )
+ valmax = val->len;
+
+ memcpy (value, val->value, valmax);
+ ret = valmax;
+ goto findxit;
+ }
+
+ ret = -1;
+
+findxit:
+ if( type < 2 ) {
+ bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ }
+ return ret;
+}
+
// set cursor to highest slot on highest page
-uint bt_lastkey (BtDb *bt)
+uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no)
{
-uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+uid page_no = bt_getid (mgr->pagezero->alloc->left);
BtPageSet set[1];
- if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
+ if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
+ set->page = bt_mappage (mgr, set->latch);
else
return 0;
- bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
- bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
- return bt->cursor->cnt;
+ bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
+ bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, thread_no, __LINE__);
+ return cursor->cnt;
}
// return previous slot on cursor page
-uint bt_prevkey (BtDb *bt, uint slot)
+uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no)
{
-uid cursor_page = bt->cursor->page_no;
+uid cursor_page = cursor->page_no;
uid ourright, next, us = cursor_page;
BtPageSet set[1];
if( --slot )
return slot;
- ourright = bt_getid(bt->cursor->right);
+ ourright = bt_getid(cursor->right);
goleft:
- if( !(next = bt_getid(bt->cursor->left)) )
+ if( !(next = bt_getid(cursor->left)) )
return 0;
findourself:
cursor_page = next;
- if( set->latch = bt_pinleaf (bt->mgr, next, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
+ if( set->latch = bt_pinleaf (mgr, next, thread_no) )
+ set->page = bt_mappage (mgr, set->latch);
else
return 0;
- bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
- bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
+ bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, thread_no, __LINE__);
- next = bt_getid (bt->cursor->right);
+ next = bt_getid (cursor->right);
- if( bt->cursor->kill )
+ if( cursor->kill )
goto findourself;
if( next != us )
else
goto findourself;
- return bt->cursor->cnt;
+ return cursor->cnt;
}
// return next slot on cursor page
uid right;
do {
- right = bt_getid(bt->cursor->right);
+ right = bt_getid(bt->cachecursor->right);
- while( slot++ < bt->cursor->cnt )
- if( slotptr(bt->cursor,slot)->dead )
+ while( slot++ < bt->cachecursor->cnt )
+ if( slotptr(bt->cachecursor,slot)->dead )
continue;
- else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
+ else if( right || (slot < bt->cachecursor->cnt) ) // skip infinite stopper
return slot;
else
break;
return 0;
bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
+ memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
slot = 0;
// cache page for retrieval
if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
- memcpy (bt->cursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
+ memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
else
return 0;
break;
case 's':
- fprintf(stderr, "started scanning\n");
-
- do {
- if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
- else
- fprintf(stderr, "unable to obtain latch"), exit(1);
-
- bt_lockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
- next = bt_getid (set->page->right);
-
- for( slot = 0; slot++ < set->page->cnt; )
- if( next || slot < set->page->cnt )
- if( !slotptr(set->page, slot)->dead ) {
- ptr = keyptr(set->page, slot);
- len = ptr->len;
-
- if( slotptr(set->page, slot)->type == Duplicate )
- len -= BtId;
-
- fwrite (ptr->key, len, 1, stdout);
- val = valptr(set->page, slot);
- fwrite (val->value, val->len, 1, stdout);
- fputc ('\n', stdout);
- cnt++;
- }
-
- bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
- } while( page_no = next );
+ fprintf(stderr, "started forward scan\n");
+ if( slot = bt_startkey (bt, NULL, 0) )
+ do {
+ if( slotptr(bt->cachecursor, slot)->type == Librarian )
+ slot++;
+
+ ptr = keyptr(bt->cachecursor, slot);
+ len = ptr->len;
+
+ if( slotptr(bt->cachecursor, slot)->type == Duplicate )
+ len -= BtId;
+
+ fwrite (ptr->key, len, 1, stdout);
+ val = valptr(bt->cachecursor, slot);
+ fwrite (val->value, val->len, 1, stdout);
+ fputc ('\n', stdout);
+ cnt++;
+ } while( slot = bt_nextkey (bt, slot) );
fprintf(stderr, " Total keys read %d\n", cnt);
break;
case 'r':
fprintf(stderr, "started reverse scan\n");
- if( slot = bt_lastkey (bt) )
- while( slot = bt_prevkey (bt, slot) ) {
- if( slotptr(bt->cursor, slot)->dead )
+ if( slot = bt_lastkey (bt->mgr, bt->cachecursor, bt->thread_no) )
+ while( slot = bt_prevkey (bt->mgr, bt->cachecursor, slot, bt->thread_no) ) {
+ if( slotptr(bt->cachecursor, slot)->dead )
continue;
- ptr = keyptr(bt->cursor, slot);
+ ptr = keyptr(bt->cachecursor, slot);
len = ptr->len;
- if( slotptr(bt->cursor, slot)->type == Duplicate )
+ if( slotptr(bt->cachecursor, slot)->type == Duplicate )
len -= BtId;
fwrite (ptr->key, len, 1, stdout);
- val = valptr(bt->cursor, slot);
+ val = valptr(bt->cachecursor, slot);
fwrite (val->value, val->len, 1, stdout);
fputc ('\n', stdout);
cnt++;
fprintf(stderr, " Total keys read %d\n", cnt);
break;
-
- case 'c':
-#ifdef unix
- posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
-#endif
- fprintf(stderr, "started counting\n");
- next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
-
- while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
- if( bt_readpage (bt->mgr, bt->cursor, page_no, 1) )
- break;
-
- if( !bt->cursor->free && !bt->cursor->lvl )
- cnt += bt->cursor->act;
-
- bt->mgr->reads++;
- page_no = next++;
- }
-
- cnt--; // remove stopper key
- fprintf(stderr, " Total keys counted %d\n", cnt);
- break;
}
bt_close (bt);
fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
fprintf (stderr, " where idx_file is the name of the cache btree file\n");
fprintf (stderr, " where main_file is the name of the main btree file\n");
- fprintf (stderr, " cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
+ fprintf (stderr, " cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
fprintf (stderr, " pagebits is the page size in bits for the cache btree\n");
fprintf (stderr, " leafbits is the number of xtra bits for a leaf page\n");
fprintf (stderr, " poolsize is the number of pages in buffer pool for the cache btree\n");
if( main )
bt_poolaudit(main, "main");
- fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+ fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
if( main )
- fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found);
+ fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
if( main )
bt_mgrclose (main);
fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
}
+BtKey *bt_key (BtPage page, uint slot)
+{
+return keyptr(page,slot);
+}
+
+BtSlot *bt_slot (BtPage page, uint slot)
+{
+return slotptr(page,slot);
+}
#endif //STANDALONE