// with reworked bt_deletekey code
// and phase-fair reader writer lock
// and librarian page split code
-// 02 SEP 2014
+// 08 SEP 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
// Page key slot definition.
-// If BT_maxbits is 15 or less, you can save 2 bytes
-// for each key stored by making the two uints
-// into ushorts.
-
// Keys are marked dead, but remain on the page until
// it cleanup is called. The fence key (highest key) for
// the page is always present, even after cleanup.
unsigned char leftkey[256], rightkey[256];
unsigned char value[BtId];
BtKey ptr;
-BtVal val;
// remove the old fence value
ptr = keyptr(set->page, set->page->cnt);
- val = valptr(set->page, set->page->cnt);
memcpy (rightkey, ptr, ptr->len + 1);
- set->page->garbage += ptr->len + val->len + 2;
memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
ptr = keyptr(set->page, set->page->cnt);
BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
{
unsigned char lowerfence[256], higherfence[256];
-uint slot, idx, dirty = 0, fence, found;
+uint slot, idx, found, fence;
BtPageSet set[1], right[1];
unsigned char value[BtId];
BtKey ptr, tst;
if( slotptr(set->page, slot)->librarian )
ptr = keyptr(set->page, ++slot);
- // are we deleting a fence slot?
-
fence = slot == set->page->cnt;
// if key is found delete it, otherwise ignore request
if( found = !keycmp (ptr, key, len) )
if( found = slotptr(set->page, slot)->dead == 0 ) {
val = valptr(set->page,slot);
- dirty = slotptr(set->page, slot)->dead = 1;
+ slotptr(set->page, slot)->dead = 1;
set->page->garbage += ptr->len + val->len + 2;
set->page->act--;
- // collapse empty slots beneath our slot
+ // collapse empty slots beneath the fence
while( idx = set->page->cnt - 1 )
if( slotptr(set->page, idx)->dead ) {
// did we delete a fence key in an upper level?
- if( dirty && lvl && set->page->act && fence )
+ if( found && fence && lvl && set->page->act )
if( bt_fixfence (bt, set, lvl) )
return bt->err;
else
BtKey key;
BtVal val;
- if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
+ if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
return slot;
// skip cleanup and proceed to split
// if there's not enough garbage
- if( page->garbage + page->min < 2 * page->act * sizeof(BtSlot) + sizeof(*page) + nxt / 3 )
+ if( page->garbage < nxt / 5 )
return 0;
memcpy (bt->frame, page, bt->mgr->page_size);
// make a librarian slot
- slotptr(page, ++idx)->off = nxt;
- slotptr(page, idx)->librarian = 1;
- slotptr(page, idx)->dead = 1;
+ if( idx ) {
+ slotptr(page, ++idx)->off = nxt;
+ slotptr(page, idx)->librarian = 1;
+ slotptr(page, idx)->dead = 1;
+ }
// set up the slot
// see if page has enough space now, or does it need splitting?
- if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
+ if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
return newslot;
return 0;
// add librarian slot
- slotptr(bt->frame, ++idx)->off = nxt;
- slotptr(bt->frame, idx)->librarian = 1;
- slotptr(bt->frame, idx)->dead = 1;
+ if( idx ) {
+ slotptr(bt->frame, ++idx)->off = nxt;
+ slotptr(bt->frame, idx)->librarian = 1;
+ slotptr(bt->frame, idx)->dead = 1;
+ }
// add actual slot
// add librarian slot
- slotptr(set->page, ++idx)->off = nxt;
- slotptr(set->page, idx)->librarian = 1;
- slotptr(set->page, idx)->dead = 1;
+ if( idx ) {
+ slotptr(set->page, ++idx)->off = nxt;
+ slotptr(set->page, idx)->librarian = 1;
+ slotptr(set->page, idx)->dead = 1;
+ }
// add actual slot
bt_unpinlatch (right->latch);
return 0;
}
-// Insert new key into the btree at given level.
-
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
-{
-BtPageSet set[1];
-uint slot, idx;
-uint reuse;
-BtKey ptr;
-BtKey tst;
-BtVal val;
-
- while( 1 ) {
- if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
- ptr = keyptr(set->page, slot);
- else {
- if( !bt->err )
- bt->err = BTERR_ovflw;
- return bt->err;
- }
-
- // if librarian slot == found slot, advance to real slot
- if( slotptr(set->page, slot)->librarian )
- if( !keycmp (ptr, key, keylen) )
- ptr = keyptr(set->page, ++slot);
+// install new key and value onto page
+// page must already be checked for
+// adequate space
- // if key already exists, update value and return
-
- if( reuse = !keycmp (ptr, key, keylen) )
- if( val = valptr(set->page, slot), val->len >= vallen ) {
- if( slotptr(set->page, slot)->dead )
- set->page->act++;
- set->page->garbage += val->len - vallen;
- slotptr(set->page, slot)->dead = 0;
- val->len = vallen;
- memcpy (val->value, value, vallen);
- bt_unlockpage(BtLockWrite, set->latch);
- bt_unpinlatch (set->latch);
- bt_unpinpool (set->pool);
- return 0;
- } else {
- if( !slotptr(set->page, slot)->dead ) {
- set->page->garbage += val->len + ptr->len + 2;
- set->page->act--;
- }
- slotptr(set->page, slot)->dead = 1;
- }
-
- // if found slot > desired slot and previous slot
- // is a librarian slot, use it
-
- if( !reuse && slot > 1 )
- if( slotptr(set->page, slot-1)->librarian )
- slot--;
-
- // check if page has enough space
+BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
+{
+uint idx, librarian;
+BtSlot *node;
- if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
- break;
+ // if found slot > desired slot and previous slot
+ // is a librarian slot, use it
- if( bt_splitpage (bt, set) )
- return bt->err;
- }
+ if( slot > 1 )
+ if( slotptr(set->page, slot-1)->librarian )
+ slot--;
- // calculate next available slot and copy key into page
+ // copy value onto page
set->page->min -= vallen + 1; // reset lowest used offset
((unsigned char *)set->page)[set->page->min] = vallen;
memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
+ // copy key onto page
+
set->page->min -= keylen + 1; // reset lowest used offset
((unsigned char *)set->page)[set->page->min] = keylen;
memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
+ // find first empty slot
+
for( idx = slot; idx < set->page->cnt; idx++ )
if( slotptr(set->page, idx)->dead )
break;
// now insert key into array before slot
- if( !reuse && idx == set->page->cnt )
- idx++, set->page->cnt++;
+ if( idx == set->page->cnt )
+ idx += 2, set->page->cnt += 2, librarian = 2;
+ else
+ librarian = 1;
set->page->act++;
- while( idx > slot )
- *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+ while( idx > slot + librarian - 1 )
+ *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
- slotptr(set->page, slot)->off = set->page->min;
- slotptr(set->page, slot)->librarian = 0;
- slotptr(set->page, slot)->dead = 0;
+ // add librarian slot
+
+ if( librarian > 1 ) {
+ node = slotptr(set->page, slot++);
+ node->off = set->page->min;
+ node->librarian = 1;
+ node->dead = 1;
+ }
+
+ // fill in new slot
+
+ node = slotptr(set->page, slot);
+ node->off = set->page->min;
+ node->librarian = 0;
+ node->dead = 0;
bt_unlockpage (BtLockWrite, set->latch);
bt_unpinlatch (set->latch);
return 0;
}
+// Insert new key into the btree at given level.
+// either add a new key or update/add an existing one
+
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
+{
+BtPageSet set[1];
+uint slot, idx;
+uid sequence;
+BtKey ptr;
+BtVal val;
+uint type;
+
+ while( 1 ) { // find the page and slot for the current key
+ if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
+ ptr = keyptr(set->page, slot);
+ else {
+ if( !bt->err )
+ bt->err = BTERR_ovflw;
+ return bt->err;
+ }
+
+ // if librarian slot == found slot, advance to real slot
+
+ if( slotptr(set->page, slot)->librarian )
+ if( !keycmp (ptr, key, keylen) )
+ ptr = keyptr(set->page, ++slot);
+
+ // check for adequate space on the page
+ // and insert the new key before slot.
+
+ if( keycmp (ptr, key, keylen) ) {
+ if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+ if( bt_splitpage (bt, set) )
+ return bt->err;
+ else
+ continue;
+
+ return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
+ }
+
+ // if key already exists, update value and return
+
+ if( val = valptr(set->page, slot), val->len >= vallen ) {
+ if( slotptr(set->page, slot)->dead )
+ set->page->act++;
+ set->page->garbage += val->len - vallen;
+ slotptr(set->page, slot)->dead = 0;
+ val->len = vallen;
+ memcpy (val->value, value, vallen);
+ bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
+ }
+
+ // new update value doesn't fit in existing value area
+
+ if( !slotptr(set->page, slot)->dead )
+ set->page->garbage += val->len + ptr->len + 2;
+ else {
+ slotptr(set->page, slot)->dead = 0;
+ set->page->act++;
+ }
+
+ if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+ if( bt_splitpage (bt, set) )
+ return bt->err;
+ else
+ continue;
+
+ set->page->min -= vallen + 1;
+ ((unsigned char *)set->page)[set->page->min] = vallen;
+ memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
+
+ set->page->min -= keylen + 1;
+ ((unsigned char *)set->page)[set->page->min] = keylen;
+ memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
+
+ slotptr(set->page, slot)->off = set->page->min;
+ bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (set->latch);
+ bt_unpinpool (set->pool);
+ return 0;
+ }
+ return 0;
+}
+
// cache page of keys into cursor and return starting slot for given key
uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
if( bt_deletekey (bt, key, len, 0) )
fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+ found += bt->found;
len = 0;
}
else if( len < 255 )
key[len++] = ch;
- fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
+ fprintf(stderr, "finished %s for %d keys, found %d \n", args->infile, line, found);
break;
case 'f':
set->latch = bt_pinlatch (bt, page_no);
bt_lockpage (BtLockRead, set->latch);
next = bt_getid (set->page->right);
- cnt += set->page->act;
for( slot = 0; slot++ < set->page->cnt; )
if( next || slot < set->page->cnt )
ptr = keyptr(set->page, slot);
fwrite (ptr->key, ptr->len, 1, stdout);
fputc ('\n', stdout);
+ cnt++;
}
bt_unlockpage (BtLockRead, set->latch);
bt_unpinpool (set->pool);
} while( page_no = next );
- cnt--; // remove stopper key
fprintf(stderr, " Total keys read %d\n", cnt);
break;