BtPage page;
page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-// madvise (page, bt->mgr->page_size, MADV_WILLNEED);
return page;
}
}
// allocate a new page
+// return with page pinned.
BTERR bt_newpage(BtDb *bt, BtPageSet *set)
{
bt_putid(right2->page->left, left_page_no);
bt_unlockpage (BtLockWrite, right2->latch);
bt_unpinlatch (right2->latch);
+ bt_unpinpool (right2->pool);
return 0;
}
BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
{
unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
-BtPageSet set[1], right[1], right2[1];
uint slot, idx, found, fence;
+BtPageSet set[1], right[1];
unsigned char value[BtId];
BtKey *ptr, *tst;
BtVal *val;
if( slotptr(set->page, slot)->type == Duplicate )
len -= BtId;
+ // not there if we reach the stopper key
+
+ if( slot == set->page->cnt )
+ if( !bt_getid (set->page->right) )
+ break;
+
// if key exists, return >= 0 value bytes copied
// otherwise return (-1)
while( cnt++ < max ) {
if( cnt == slot )
newslot = idx + 2;
- if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+ if( cnt < max || page->lvl )
+ if( slotptr(bt->frame,cnt)->dead )
continue;
// copy the value across
val = valptr(bt->frame, cnt);
nxt -= val->len + sizeof(BtVal);
- ((unsigned char *)page)[nxt] = val->len;
- memcpy ((unsigned char *)page + nxt + sizeof(BtVal), val->value, val->len);
+ memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
// copy the key across
// make a librarian slot
- if( idx ) {
- slotptr(page, ++idx)->off = nxt;
- slotptr(page, idx)->type = Librarian;
- slotptr(page, idx)->dead = 1;
- }
+ slotptr(page, ++idx)->off = nxt;
+ slotptr(page, idx)->type = Librarian;
+ slotptr(page, idx)->dead = 1;
// set up the slot
idx = 0;
while( cnt++ < max ) {
- if( slotptr(set->page, cnt)->dead && cnt < max )
+ if( cnt < max || set->page->lvl )
+ if( slotptr(set->page, cnt)->dead )
continue;
src = valptr(set->page, cnt);
nxt -= src->len + sizeof(BtVal);
- val = (BtVal*)((unsigned char *)bt->frame + nxt);
- memcpy (val->value, src->value, src->len);
- val->len = src->len;
+ memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
key = keyptr(set->page, cnt);
nxt -= key->len + sizeof(BtKey);
- ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
- memcpy (ptr, key, key->len + sizeof(BtKey));
+ memcpy ((unsigned char *)bt->frame + nxt, key, key->len + sizeof(BtVal));
// add librarian slot
- if( idx ) {
- slotptr(bt->frame, ++idx)->off = nxt;
- slotptr(bt->frame, idx)->type = Librarian;
- slotptr(bt->frame, idx)->dead = 1;
- }
+ slotptr(bt->frame, ++idx)->off = nxt;
+ slotptr(bt->frame, idx)->type = Librarian;
+ slotptr(bt->frame, idx)->dead = 1;
// add actual slot
return bt->err;
memcpy (right->page, bt->frame, bt->mgr->page_size);
- bt_unpinpool (right->pool);
// update lower keys to continue in old page
while( cnt++ < max ) {
if( slotptr(bt->frame, cnt)->dead )
continue;
- val = valptr(bt->frame, cnt);
- nxt -= val->len + sizeof(BtVal);
- ((unsigned char *)set->page)[nxt] = val->len;
- memcpy ((unsigned char *)set->page + nxt + sizeof(BtVal), val->value, val->len);
+ src = valptr(bt->frame, cnt);
+ nxt -= src->len + sizeof(BtVal);
+ memcpy ((unsigned char *)set->page + nxt, src, src->len + sizeof(BtVal));
key = keyptr(bt->frame, cnt);
nxt -= key->len + sizeof(BtKey);
// add librarian slot
- if( idx ) {
- slotptr(set->page, ++idx)->off = nxt;
- slotptr(set->page, idx)->type = Librarian;
- slotptr(set->page, idx)->dead = 1;
- }
+ slotptr(set->page, ++idx)->off = nxt;
+ slotptr(set->page, idx)->type = Librarian;
+ slotptr(set->page, idx)->dead = 1;
// add actual slot
// insert new fence for reformulated left block of smaller keys
bt_putid (value, set->page_no);
+ ptr = (BtKey*)fencekey;
- if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
+ if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
return bt->err;
// switch fence for right block of larger keys to new right page
bt_putid (value, right->page_no);
+ ptr = (BtKey*)rightkey;
- if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
+ if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
return bt->err;
bt_unlockpage (BtLockParent, set->latch);
bt_unlockpage (BtLockParent, right->latch);
bt_unpinlatch (right->latch);
+ bt_unpinpool (right->pool);
return 0;
}
}
#endif
+void bt_poolaudit (BtMgr *mgr)
+{
+uint slot = 0;
+BtPool *pool;
+
+ while( slot++ < mgr->poolcnt ) {
+ pool = mgr->pool + slot;
+ if( pool->pin & ~CLOCK_bit )
+ fprintf(stderr, "pool slot %d pinned\n", slot);
+ }
+}
+
uint bt_latchaudit (BtDb *bt)
{
ushort idx, hashidx;
if( !bt->frame->free ) {
for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
ptr = keyptr(bt->frame, idx+1);
- if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
+ if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) > 0 )
fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
}
if( !bt->frame->lvl )
}
else if( len < BT_maxkey )
key[len++] = ch;
+
fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
break;
set->page = bt_page (bt, set->pool, page_no);
else
break;
+ madvise (set->page, bt->mgr->page_size, MADV_WILLNEED);
set->latch = bt_pinlatch (bt, page_no);
bt_lockpage (BtLockRead, set->latch);
next = bt_getid (set->page->right);
elapsed = getCpuTime(2);
fprintf(stderr, " sys %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
+ bt_poolaudit (mgr);
bt_mgrclose (mgr);
}