// btree version threadskv1 sched_yield version
// with reworked bt_deletekey code
// and phase-fair reader writer lock
-// 12 MAR 2014
+// 08 SEP 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
// B-Tree functions
extern void bt_close (BtDb *bt);
extern BtDb *bt_open (BtMgr *mgr);
-extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, unsigned char *value, uint vallen);
+extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len);
void bt_mgrclose (BtMgr *mgr);
// Helper functions to return slot values
+// from the cursor page.
extern BtKey bt_key (BtDb *bt, uint slot);
extern BtVal bt_val (BtDb *bt, uint slot);
// BTree page number constants
-#define ALLOC_page 0 // allocation & lock manager hash table
+#define ALLOC_page 0 // allocation & latch manager hash table
#define ROOT_page 1 // root of the btree
#define LEAF_page 2 // first page of leaves
-#define LATCH_page 3 // pages for lock manager
+#define LATCH_page 3 // pages for latch manager
// Number of levels to create in a new BTree
// The page is allocated from low and hi ends.
// The key slots are allocated from the bottom,
// while the text and value of the key
-// is allocated from the top. When the two
+// are allocated from the top. When the two
// areas meet, the page is split into two.
// A key consists of a length byte, two bytes of
pool = mgr->pool + slot;
if( pool->slot )
#ifdef unix
- munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
+ munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
#else
{
FlushViewOfFile(pool->map, 0);
#ifdef unix
munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
- munmap (mgr->latchmgr, mgr->page_size);
+ munmap (mgr->latchmgr, 2 * mgr->page_size);
#else
FlushViewOfFile(mgr->latchmgr, 0);
UnmapViewOfFile(mgr->latchmgr);
mgrlatch:
#ifdef unix
+ // mlock the root page and the latchmgr page
+
flag = PROT_READ | PROT_WRITE;
- mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
+ mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
if( mgr->latchmgr == MAP_FAILED )
return bt_mgrclose (mgr), NULL;
+ mlock (mgr->latchmgr, 2 * mgr->page_size);
+
mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
if( mgr->latchsets == MAP_FAILED )
return bt_mgrclose (mgr), NULL;
+ mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
#else
flag = PAGE_READWRITE;
mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
#ifdef unix
flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
- pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
+ pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
if( pool->map == MAP_FAILED )
return bt->err = BTERR_map;
return bt->err = BTERR_map;
flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
- pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
+ pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
if( !pool->map )
return bt->err = BTERR_map;
#endif
BtPage page;
page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
+// madvise (page, bt->mgr->page_size, MADV_WILLNEED);
return page;
}
// remove old file mapping
#ifdef unix
- munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
+ munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
#else
// FlushViewOfFile(pool->map, 0);
UnmapViewOfFile(pool->map);
// find key on page at this level
// and descend to requested level
- if( !set->page->kill )
- if( slot = bt_findslot (set, key, len) ) {
+ if( set->page->kill )
+ goto slideright;
+
+ if( slot = bt_findslot (set, key, len) ) {
if( drill == lvl )
return slot;
page_no = bt_getid(valptr(set->page, slot)->value);
drill--;
continue;
- }
+ }
// or slide right into next page
{
unsigned char leftkey[256], rightkey[256];
unsigned char value[BtId];
-uid page_no;
BtKey ptr;
// remove the old fence value
ptr = keyptr(set->page, set->page->cnt);
memcpy (leftkey, ptr, ptr->len + 1);
- page_no = set->page_no;
bt_lockpage (BtLockParent, set->latch);
bt_unlockpage (BtLockWrite, set->latch);
// insert new (now smaller) fence key
- bt_putid (value, page_no);
+ bt_putid (value, set->page_no);
if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
return bt->err;
BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
{
unsigned char lowerfence[256], higherfence[256];
-uint slot, idx, dirty = 0, fence, found;
+uint slot, idx, found, fence;
BtPageSet set[1], right[1];
unsigned char value[BtId];
BtKey ptr;
else
return bt->err;
- // are we deleting a fence slot?
+ // are we deleting a fence key?
fence = slot == set->page->cnt;
if( found = !keycmp (ptr, key, len) )
if( found = slotptr(set->page, slot)->dead == 0 ) {
- dirty = slotptr(set->page, slot)->dead = 1;
+ slotptr(set->page, slot)->dead = 1;
set->page->dirty = 1;
set->page->act--;
- // collapse empty slots
+ // collapse empty slots beneath our fence
while( idx = set->page->cnt - 1 )
if( slotptr(set->page, idx)->dead ) {
// did we delete a fence key in an upper level?
- if( dirty && lvl && set->page->act && fence )
+ if( found && fence && lvl && set->page->act )
if( bt_fixfence (bt, set, lvl) )
return bt->err;
else
else
return 0;
- // if key exists, return TRUE
- // otherwise return FALSE
+ // if key exists, return >= 0 value bytes copied
+ // otherwise return (-1)
- if( !keycmp (ptr, key, keylen) ) {
+ if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
val = valptr (set->page,slot);
if( valmax > val->len )
valmax = val->len;
if( cnt < max && slotptr(bt->frame,cnt)->dead )
continue;
- // copy the key across
-
- key = keyptr(bt->frame, cnt);
- nxt -= key->len + 1;
- memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-
// copy the value across
val = valptr(bt->frame, cnt);
nxt -= val->len + 1;
((unsigned char *)page)[nxt] = val->len;
- memcpy ((unsigned char *)page + nxt + 1, val, val->len);
+ memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
+
+ // copy the key across
+
+ key = keyptr(bt->frame, cnt);
+ nxt -= key->len + 1;
+ memcpy ((unsigned char *)page + nxt, key, key->len + 1);
// set up the slot
- slotptr(page, idx)->off = nxt;
+ slotptr(page, ++idx)->off = nxt;
if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
page->act++;
bt_unpinlatch (right->latch);
return 0;
}
-// Insert new key into the btree at given level.
+// install new key and value onto page
+// page must already be checked for
+// adequate space
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, unsigned char *value, uint vallen)
+BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
{
-BtPageSet set[1];
-uint slot, idx;
-uint reuse;
-BtKey ptr;
-BtVal val;
-
- while( 1 ) {
- if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
- ptr = keyptr(set->page, slot);
- else
- {
- if( !bt->err )
- bt->err = BTERR_ovflw;
- return bt->err;
- }
-
- // if key already exists, update id and return
-
- if( reuse = !keycmp (ptr, key, keylen) )
- if( val = valptr(set->page, slot), val->len >= vallen ) {
- if( slotptr(set->page, slot)->dead )
- set->page->act++;
- slotptr(set->page, slot)->dead = 0;
- val->len = vallen;
- memcpy (val->value, value, vallen);
- bt_unlockpage(BtLockWrite, set->latch);
- bt_unpinlatch (set->latch);
- bt_unpinpool (set->pool);
- return 0;
- } else {
- if( !slotptr(set->page, slot)->dead )
- set->page->act--;
- slotptr(set->page, slot)->dead = 1;
- set->page->dirty = 1;
- }
-
- // check if page has enough space
-
- if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
- break;
-
- if( bt_splitpage (bt, set) )
- return bt->err;
- }
+BtSlot *node;
+uint idx;
- // calculate next available slot and copy key into page
+ // copy value onto page
set->page->min -= vallen + 1; // reset lowest used offset
((unsigned char *)set->page)[set->page->min] = vallen;
memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
+ // copy key onto page
+
set->page->min -= keylen + 1; // reset lowest used offset
((unsigned char *)set->page)[set->page->min] = keylen;
memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
+ // find first empty slot
+
for( idx = slot; idx < set->page->cnt; idx++ )
if( slotptr(set->page, idx)->dead )
break;
// now insert key into array before slot
- if( !reuse && idx == set->page->cnt )
- idx++, set->page->cnt++;
+ if( idx == set->page->cnt )
+ idx += 1, set->page->cnt += 1;
set->page->act++;
while( idx > slot )
- *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+ *slotptr(set->page, idx) = *slotptr(set->page, idx - 1), idx--;
- slotptr(set->page, slot)->off = set->page->min;
- slotptr(set->page, slot)->dead = 0;
+ // fill in new slot
+
+ node = slotptr(set->page, slot);
+ node->off = set->page->min;
+ node->dead = 0;
bt_unlockpage (BtLockWrite, set->latch);
bt_unpinlatch (set->latch);
return 0;
}
-// cache page of keys into cursor and return starting slot for given key
+// Insert new key into the btree at given level.
+// either add a new key or update/add an existing one
+
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
+{
+BtPageSet set[1];
+uint slot, idx;
+uid sequence;
+BtKey ptr;
+BtVal val;
+
+ while( 1 ) { // find the page and slot for the current key
+ if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
+ ptr = keyptr(set->page, slot);
+ else {
+ if( !bt->err )
+ bt->err = BTERR_ovflw;
+ return bt->err;
+ }
+
+ // check for adequate space on the page
+ // and insert the new key before slot.
+
+ if( keycmp (ptr, key, keylen) ) {
+ if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+ if( bt_splitpage (bt, set) )
+ return bt->err;
+ else
+ continue;
+
+ return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
+ }
+
+ // if key already exists, update value and return
+
+ if( val = valptr(set->page, slot), val->len >= vallen ) {
+ if( slotptr(set->page, slot)->dead )
+ set->page->act++;
+ slotptr(set->page, slot)->dead = 0;
+ set->page->dirty = 1;
+ val->len = vallen;
+ memcpy (val->value, value, vallen);
+ bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
+ }
+
+ // new update value doesn't fit in existing value area
+
+ if( !slotptr(set->page, slot)->dead )
+ set->page->dirty = 1;
+ else {
+ slotptr(set->page, slot)->dead = 0;
+ set->page->act++;
+ }
+
+ if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+ if( bt_splitpage (bt, set) )
+ return bt->err;
+ else
+ continue;
+
+ set->page->min -= vallen + 1;
+ ((unsigned char *)set->page)[set->page->min] = vallen;
+ memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
+
+ set->page->min -= keylen + 1;
+ ((unsigned char *)set->page)[set->page->min] = keylen;
+ memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
+
+ slotptr(set->page, slot)->off = set->page->min;
+ bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
+ }
+ return 0;
+}
uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
{
if( *latch->parent->rin & MASK )
fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
- memset ((ushort *)latch->access, 0, sizeof(RWLock));
+ memset ((ushort *)latch->parent, 0, sizeof(RWLock));
if( latch->pin ) {
fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
}
typedef struct {
- char type, idx;
+ char idx;
+ char *type;
char *infile;
BtMgr *mgr;
int num;
bt = bt_open (args->mgr);
- switch(args->type | 0x20)
+ switch(args->type[0] | 0x20)
{
case 'a':
fprintf(stderr, "started latch mgr audit\n");
else if( args->num )
sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
+ if( (args->type[1] | 0x20) == 'p' )
+ len = 10;
if( bt_deletekey (bt, key, len, 0) )
fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
len = 0;
else if( args->num )
sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
+ if( (args->type[1] | 0x20) == 'p' )
+ len = 10;
if( bt_findkey (bt, key, len, NULL, 0) == 0 )
found++;
else if( bt->err )
#endif
args = malloc (cnt * sizeof(ThreadArg));
- mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
+// mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
+ mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, 8191);
if( !mgr ) {
fprintf(stderr, "Index Open Error %s\n", argv[1]);
for( idx = 0; idx < cnt; idx++ ) {
args[idx].infile = argv[idx + 7];
- args[idx].type = argv[2][0];
+ args[idx].type = argv[2];
args[idx].mgr = mgr;
args[idx].num = num;
args[idx].idx = idx;