-// btree version threadskv10f futex version
+// btree version threadskv10g futex version
// with reworked bt_deletekey code,
// phase-fair re-entrant reader writer lock,
// librarian page split code,
// redo log for failure recovery
// LSM B-trees for write optimization
// larger sized leaf pages than non-leaf
-// and LSM B-tree find operations
+// and LSM B-tree find & count operations
// 28 OCT 2014
typedef struct {
BtMgr *mgr; // buffer manager for entire process
BtMgr *main; // buffer manager for main btree
- BtPage cachecursor; // cached page frame for cache btree
- BtPage maincursor; // cached page frame for main btree
+ BtPageSet cacheset[1]; // cached page frame for cache btree
+ BtPageSet mainset[1]; // cached page frame for main btree
+ uint cacheslot; // slot number in cacheset
+ uint mainslot; // slot number in mainset
+ ushort phase; // 1 = main btree 0 = cache btree 2 = both
ushort thread_no; // thread number
- unsigned char key[BT_keyarray]; // last found complete key
+ BtSlot *cachenode;
+ BtSlot *mainnode;
+ BtKey *cachekey;
+ BtKey *mainkey;
+ BtVal *cacheval;
+ BtVal *mainval;
} BtDb;
-// atomic txn structures
+// atomic txn structure
typedef struct {
logseqno reqlsn; // redo log seq no required
extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
-extern uint bt_startkey (BtDb *db, unsigned char *key, uint len);
-extern uint bt_nextkey (BtDb *bt, uint slot);
-extern uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no);
-extern uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no);
+extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
+extern BTERR bt_nextkey (BtDb *bt);
+
+extern uint bt_lastkey (BtDb *bt);
+extern uint bt_prevkey (BtDb *bt);
// manager functions
extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
void bt_close (BtDb *bt)
{
-#ifdef unix
- if( bt->cachecursor )
- free (bt->cachecursor);
- if( bt->maincursor )
- free (bt->maincursor);
-#else
- if( bt->cachecursor)
- VirtualFree (bt->cachecursor, 0, MEM_RELEASE);
- if( bt->maincursor)
- VirtualFree (bt->maincursor, 0, MEM_RELEASE);
-#endif
free (bt);
}
memset (bt, 0, sizeof(*bt));
bt->main = main;
bt->mgr = mgr;
-#ifdef unix
- bt->cachecursor = valloc (mgr->page_size << mgr->leaf_xtra);
- bt->maincursor = valloc (mgr->page_size << mgr->leaf_xtra);
-#else
- bt->cachecursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
- bt->maincursor = VirtualAlloc(NULL, mgr->page_size << mgr->leaf_xtra, MEM_COMMIT, PAGE_READWRITE);
-#endif
#ifdef unix
bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
#else
bt_putid (temp->page->left, set->latch->page_no);
bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
bt_unpinlatch (temp->latch, thread_no, __LINE__);
- } else { // our page is now rightmost
+ } else if( !lvl ) { // our page is now rightmost leaf
bt_mutexlock (mgr->lock);
bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
bt_releasemutex(mgr->lock);
val = valptr(set->page,slot);
set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
set->page->act--;
-
- // mark node type as delete
-
- node->type = Delete;
node->dead = 1;
// collapse empty slots beneath the fence
bt_putid (temp->page->left, right->latch->page_no);
bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
bt_unpinlatch (temp->latch, thread_no, __LINE__);
- } else { // page is rightmost
+ } else if( !lvl ) { // page is rightmost leaf
bt_mutexlock (mgr->lock);
bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
bt_releasemutex(mgr->lock);
bt_putid (temp->page->left, right->page_no);
bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
bt_unpinlatch (temp->latch, thread_no, __LINE__);
- } else { // right page is far right page
+ } else if( !lvl ) { // right page is far right page
bt_mutexlock (mgr->lock);
bt_putid (mgr->pagezero->alloc->left, right->page_no);
bt_releasemutex(mgr->lock);
return ret;
}
-// set cursor to highest slot on highest page
+// set cursor to highest slot on left-most page
-uint bt_lastkey (BtMgr *mgr, BtPage cursor, ushort thread_no)
+BTERR bt_lastkey (BtDb *bt)
{
-uid page_no = bt_getid (mgr->pagezero->alloc->left);
-BtPageSet set[1];
+uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
- if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
- set->page = bt_mappage (mgr, set->latch);
+ if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
+ bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
else
- return 0;
+ return bt->mgr->err;
- bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
- memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
- bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
- return cursor->cnt;
+ bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
+ bt->cacheslot = bt->cacheset->page->cnt;
+
+ if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
+ bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
+ else
+ return bt->main->err;
+
+ bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
+ bt->mainslot = bt->mainset->page->cnt;
+ bt->phase = 2;
+ return 0;
}
// return previous slot on cursor page
-uint bt_prevkey (BtMgr *mgr, BtPage cursor, uint slot, ushort thread_no)
+uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
{
-uid cursor_page = cursor->page_no;
-uid ourright, next, us = cursor_page;
-BtPageSet set[1];
+uid next, us = set->latch->page_no;
- if( --slot )
+ while( 1 ) {
+ while( --slot )
+ if( slotptr(set->page, slot)->dead )
+ continue;
+ else
return slot;
- ourright = bt_getid(cursor->right);
+ next = bt_getid(set->page->left);
-goleft:
- if( !(next = bt_getid(cursor->left)) )
+ if( !next )
return 0;
-findourself:
- cursor_page = next;
+ do {
+ bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, thread_no, __LINE__);
- if( set->latch = bt_pinleaf (mgr, next, thread_no) )
- set->page = bt_mappage (mgr, set->latch);
+ if( set->latch = bt_pinleaf (mgr, next, thread_no) )
+ set->page = bt_mappage (mgr, set->latch);
+ else
+ return 0;
+
+ bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ next = bt_getid (set->page->right);
+
+ } while( next != us );
+
+ slot = set->page->cnt + 1;
+ }
+}
+
+// advance to previous key
+
+BTERR bt_prevkey (BtDb *bt)
+{
+int cmp;
+
+ // first advance last key(s) one previous slot
+
+ while( 1 ) {
+ switch( bt->phase ) {
+ case 0:
+ bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+ break;
+ case 1:
+ bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+ break;
+ case 2:
+ bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+ bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+ break;
+ }
+
+ // return next key
+
+ if( bt->cacheslot ) {
+ bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
+ bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
+ bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
+ }
+
+ if( bt->mainslot ) {
+ bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
+ bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
+ bt->mainval = valptr(bt->mainset->page, bt->mainslot);
+ }
+
+ if( bt->mainslot && bt->cacheslot )
+ cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
+ else if( bt->cacheslot )
+ cmp = 1;
+ else if( bt->mainslot )
+ cmp = -1;
else
- return 0;
+ return 0;
- bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
- memcpy (cursor, set->page, mgr->page_size << mgr->leaf_xtra);
- bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
-
- next = bt_getid (cursor->right);
+ // cache key is larger
- if( cursor->kill )
- goto findourself;
+ if( cmp > 0 ) {
+ bt->phase = 0;
+ if( bt->cachenode->type == Delete )
+ continue;
+ return bt->cacheslot;
+ }
- if( next != us )
- if( next == ourright )
- goto goleft;
- else
- goto findourself;
+ // main key is larger
- return cursor->cnt;
+ if( cmp < 0 ) {
+ bt->phase = 1;
+ return bt->mainslot;
+ }
+
+ // keys are equal
+
+ bt->phase = 2;
+
+ if( bt->cachenode->type == Delete )
+ continue;
+
+ return bt->cacheslot;
+ }
}
-// return next slot on cursor page
-// or slide cursor right into next page
+// advance to next slot in cache or main btree
+// return 0 for EOF/error
-uint bt_nextkey (BtDb *bt, uint slot)
+uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
{
-BtPageSet set[1];
-uid right;
-
- do {
- right = bt_getid(bt->cachecursor->right);
+BtPage page;
+uid page_no;
- while( slot++ < bt->cachecursor->cnt )
- if( slotptr(bt->cachecursor,slot)->dead )
+ while( 1 ) {
+ while( slot++ < set->page->cnt )
+ if( slotptr(set->page, slot)->dead )
continue;
- else if( right || (slot < bt->cachecursor->cnt) ) // skip infinite stopper
+ else if( slot < set->page->cnt || bt_getid (set->page->right) )
return slot;
else
- break;
+ return 0;
- if( !right )
- break;
+ bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, thread_no, __LINE__);
- if( set->latch = bt_pinleaf (bt->mgr, right, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
- else
+ if( page_no = bt_getid(set->page->right) )
+ if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
+ set->page = bt_mappage (mgr, set->latch);
+ else
return 0;
+ else
+ return 0; // EOF
- bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
- bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ // obtain access lock using lock chaining with Access mode
+
+ bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
+ bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
+ bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
slot = 0;
+ }
+}
+
+// advance to next key
+
+BTERR bt_nextkey (BtDb *bt)
+{
+int cmp;
+
+ // first advance last key(s) one next slot
+
+ while( 1 ) {
+ switch( bt->phase ) {
+ case 0:
+ bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+ break;
+ case 1:
+ bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+ break;
+ case 2:
+ bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
+ bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
+ break;
+ }
+
+ // return next key
+
+ if( bt->cacheslot ) {
+ bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
+ bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
+ bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
+ }
+
+ if( bt->mainslot ) {
+ bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
+ bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
+ bt->mainval = valptr(bt->mainset->page, bt->mainslot);
+ }
+
+ if( bt->mainslot && bt->cacheslot )
+ cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
+ else if( bt->mainslot )
+ cmp = 1;
+ else if( bt->cacheslot )
+ cmp = -1;
+ else
+ return 0;
- } while( 1 );
+ // main key is larger
- return bt->mgr->err = 0;
+ if( cmp < 0 ) {
+ bt->phase = 0;
+ if( bt->cachenode->type == Delete )
+ continue;
+ return bt->cacheslot;
+ }
+
+ // cache key is larger
+
+ if( cmp > 0 ) {
+ bt->phase = 1;
+ return bt->mainslot;
+ }
+
+ // keys are equal
+
+ bt->phase = 2;
+
+ if( bt->cachenode->type == Delete )
+ continue;
+
+ return bt->cacheslot;
+ }
}
-// cache page of keys into cursor and return starting slot for given key
+// start sweep of keys
-uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
+BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
{
BtPageSet set[1];
uint slot;
- // cache page for retrieval
+ // cache btree page
- if( slot = bt_loadpage (bt->mgr, set, key, len, 0, BtLockRead, bt->thread_no) )
- memcpy (bt->cachecursor, set->page, bt->mgr->page_size << bt->mgr->leaf_xtra);
+ if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
+ bt->cacheslot = slot - 1;
else
- return 0;
+ return bt->mgr->err;
- bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
- bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
- return slot;
+ // main btree page
+
+ if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
+ bt->mainslot = slot - 1;
+ else
+ return bt->mgr->err;
+
+ bt->phase = 2;
+ return 0;
}
#ifdef STANDALONE
uint __stdcall index_file (void *arg)
#endif
{
-int line = 0, found = 0, cnt = 0, idx;
+int line = 0, found = 0, cnt = 0, cachecnt, idx;
uid next, page_no = LEAF_page; // start on first page of leaves
int ch, len = 0, slot, type = 0;
unsigned char key[BT_maxkey];
-unsigned char txn[65536];
+unsigned char buff[65536];
+uint nxt = sizeof(buff);
ThreadArg *args = arg;
BtPage page, frame;
BtPageSet set[1];
-uint nxt = 65536;
+int vallen;
BtKey *ptr;
BtVal *val;
BtDb *bt;
FILE *in;
bt = bt_open (args->mgr, args->main);
- page = (BtPage)txn;
+ page = (BtPage)buff;
if( args->idx < strlen (args->type) )
ch = args->type[args->idx];
}
nxt -= len - 10;
- memcpy (txn + nxt, key + 10, len - 10);
+ memcpy (buff + nxt, key + 10, len - 10);
nxt -= 1;
- txn[nxt] = len - 10;
+ buff[nxt] = len - 10;
nxt -= 10;
- memcpy (txn + nxt, key, 10);
+ memcpy (buff + nxt, key, 10);
nxt -= 1;
- txn[nxt] = 10;
+ buff[nxt] = 10;
slotptr(page,++cnt)->off = nxt;
slotptr(page,cnt)->type = type;
len = 0;
if( bt_atomictxn (bt, page) )
fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
- nxt = sizeof(txn);
+ nxt = sizeof(buff);
cnt = 0;
}
case 's':
fprintf(stderr, "started forward scan\n");
- if( slot = bt_startkey (bt, NULL, 0) )
- do {
- if( slotptr(bt->cachecursor, slot)->type == Librarian )
- slot++;
+ if( bt_startkey (bt, NULL, 0) )
+ fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
+
+ while( bt_nextkey (bt) ) {
+ if( bt->phase == 1 ) {
+ len = bt->mainkey->len;
+
+ if( bt->mainnode->type == Duplicate )
+ len -= BtId;
- ptr = keyptr(bt->cachecursor, slot);
- len = ptr->len;
+ fwrite (bt->mainkey->key, len, 1, stdout);
+ fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
+ } else {
+ len = bt->cachekey->len;
- if( slotptr(bt->cachecursor, slot)->type == Duplicate )
+ if( bt->cachenode->type == Duplicate )
len -= BtId;
- fwrite (ptr->key, len, 1, stdout);
- val = valptr(bt->cachecursor, slot);
- fwrite (val->value, val->len, 1, stdout);
- fputc ('\n', stdout);
- cnt++;
- } while( slot = bt_nextkey (bt, slot) );
+ fwrite (bt->cachekey->key, len, 1, stdout);
+ fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
+ }
+
+ fputc ('\n', stdout);
+ cnt++;
+ }
+
+ bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+
+ bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
fprintf(stderr, " Total keys read %d\n", cnt);
break;
case 'r':
fprintf(stderr, "started reverse scan\n");
- if( slot = bt_lastkey (bt->mgr, bt->cachecursor, bt->thread_no) )
- while( slot = bt_prevkey (bt->mgr, bt->cachecursor, slot, bt->thread_no) ) {
- if( slotptr(bt->cachecursor, slot)->dead )
- continue;
+ if( bt_lastkey (bt) )
+ fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
- ptr = keyptr(bt->cachecursor, slot);
- len = ptr->len;
+ while( bt_prevkey (bt) ) {
+ if( bt->phase == 1 ) {
+ len = bt->mainkey->len;
- if( slotptr(bt->cachecursor, slot)->type == Duplicate )
+ if( bt->mainnode->type == Duplicate )
len -= BtId;
- fwrite (ptr->key, len, 1, stdout);
- val = valptr(bt->cachecursor, slot);
- fwrite (val->value, val->len, 1, stdout);
- fputc ('\n', stdout);
- cnt++;
+ fwrite (bt->mainkey->key, len, 1, stdout);
+ fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
+ } else {
+ len = bt->cachekey->len;
+
+ if( bt->cachenode->type == Duplicate )
+ len -= BtId;
+
+ fwrite (bt->cachekey->key, len, 1, stdout);
+ fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
}
+ fputc ('\n', stdout);
+ cnt++;
+ }
+
+ bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
+
+ bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
+
fprintf(stderr, " Total keys read %d\n", cnt);
break;
+
+ case 'c':
+ fprintf(stderr, "started counting LSM cache btree\n");
+ page_no = LEAF_page;
+
+ do {
+ if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
+ set->page = bt_mappage (bt->mgr, set->latch);
+ else
+ return 0;
+
+ bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+ cnt += set->page->act;
+ page_no = bt_getid (set->page->right);
+ bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ } while( page_no );
+
+ cachecnt = --cnt; // remove stopper key
+ cnt = 0;
+
+ fprintf(stderr, "started counting LSM main btree\n");
+ page_no = LEAF_page;
+
+ do {
+ if( set->latch = bt_pinleaf (bt->main, page_no, bt->thread_no) )
+ set->page = bt_mappage (bt->main, set->latch);
+ else
+ return 0;
+
+ bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+ cnt += set->page->act;
+ page_no = bt_getid (set->page->right);
+ bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
+ bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
+ } while( page_no );
+
+ cnt--; // remove stopper key
+
+ fprintf(stderr, " cache keys counted %d\n", cachecnt);
+ fprintf(stderr, " main keys counted %d\n", cnt);
+ fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
+ break;
}
bt_close (bt);