From: unknown Date: Mon, 8 Sep 2014 21:56:18 +0000 (-0700) Subject: Fix a few bugs X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=43dc509851b939b1908da6df4642cb6fd49b7e71 Fix a few bugs --- diff --git a/threadskv1.c b/threadskv1.c index 3ce5b8a..2d95c4a 100644 --- a/threadskv1.c +++ b/threadskv1.c @@ -1,7 +1,7 @@ // btree version threadskv1 sched_yield version // with reworked bt_deletekey code // and phase-fair reader writer lock -// 12 MAR 2014 +// 08 SEP 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -1645,7 +1645,7 @@ uint idx; BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) { unsigned char lowerfence[256], higherfence[256]; -uint slot, idx, dirty = 0, fence, found; +uint slot, idx, found, fence; BtPageSet set[1], right[1]; unsigned char value[BtId]; BtKey ptr; @@ -1655,7 +1655,7 @@ BtKey ptr; else return bt->err; - // are we deleting a fence slot? + // are we deleting a fence key? fence = slot == set->page->cnt; @@ -1663,11 +1663,11 @@ BtKey ptr; if( found = !keycmp (ptr, key, len) ) if( found = slotptr(set->page, slot)->dead == 0 ) { - dirty = slotptr(set->page, slot)->dead = 1; + slotptr(set->page, slot)->dead = 1; set->page->dirty = 1; set->page->act--; - // collapse empty slots + // collapse empty slots beneath our fence while( idx = set->page->cnt - 1 ) if( slotptr(set->page, idx)->dead ) { @@ -1679,7 +1679,7 @@ BtKey ptr; // did we delete a fence key in an upper level? - if( dirty && lvl && set->page->act && fence ) + if( found && fence && lvl && set->page->act ) if( bt_fixfence (bt, set, lvl) ) return bt->err; else @@ -2057,81 +2057,48 @@ BtVal val; bt_unpinlatch (right->latch); return 0; } -// Insert new key into the btree at given level. +// install new key and value onto page +// page must already be checked for +// adequate space -BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen) +BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen) { -BtPageSet set[1]; -uint slot, idx; -uint reuse; -BtKey ptr; -BtVal val; - - while( 1 ) { - if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) - ptr = keyptr(set->page, slot); - else - { - if( !bt->err ) - bt->err = BTERR_ovflw; - return bt->err; - } - - // if key already exists, update value and return - - if( reuse = !keycmp (ptr, key, keylen) ) - if( val = valptr(set->page, slot), val->len >= vallen ) { - if( slotptr(set->page, slot)->dead ) - set->page->act++; - slotptr(set->page, slot)->dead = 0; - val->len = vallen; - memcpy (val->value, value, vallen); - bt_unlockpage(BtLockWrite, set->latch); - bt_unpinlatch (set->latch); - bt_unpinpool (set->pool); - return 0; - } else { - if( !slotptr(set->page, slot)->dead ) - set->page->act--; - slotptr(set->page, slot)->dead = 1; - set->page->dirty = 1; - } - - // check if page has enough space - - if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) ) - break; - - if( bt_splitpage (bt, set) ) - return bt->err; - } +BtSlot *node; +uint idx; - // calculate next available slot and copy key into page + // copy value onto page set->page->min -= vallen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = vallen; memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen ); + // copy key onto page + set->page->min -= keylen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = keylen; memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen ); + // find first empty slot + for( idx = slot; idx < set->page->cnt; idx++ ) if( slotptr(set->page, idx)->dead ) break; // now insert key into array before slot - if( !reuse && idx == set->page->cnt ) - idx++, set->page->cnt++; + if( idx == set->page->cnt ) + idx += 1, set->page->cnt += 1; set->page->act++; while( idx > slot ) - *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--; + *slotptr(set->page, idx) = *slotptr(set->page, idx - 1), idx--; - slotptr(set->page, slot)->off = set->page->min; - slotptr(set->page, slot)->dead = 0; + // fill in new slot + + node = slotptr(set->page, slot); + node->off = set->page->min; + node->dead = 0; bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (set->latch); @@ -2139,7 +2106,85 @@ BtVal val; return 0; } -// cache page of keys into cursor and return starting slot for given key +// Insert new key into the btree at given level. +// either add a new key or update/add an existing one + +BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen) +{ +BtPageSet set[1]; +uint slot, idx; +uid sequence; +BtKey ptr; +BtVal val; + + while( 1 ) { // find the page and slot for the current key + if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) + ptr = keyptr(set->page, slot); + else { + if( !bt->err ) + bt->err = BTERR_ovflw; + return bt->err; + } + + // check for adequate space on the page + // and insert the new key before slot. + + if( keycmp (ptr, key, keylen) ) { + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + return bt_insertslot (bt, set, slot, key, keylen, value, vallen); + } + + // if key already exists, update value and return + + if( val = valptr(set->page, slot), val->len >= vallen ) { + if( slotptr(set->page, slot)->dead ) + set->page->act++; + slotptr(set->page, slot)->dead = 0; + set->page->dirty = 1; + val->len = vallen; + memcpy (val->value, value, vallen); + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + + // new update value doesn't fit in existing value area + + if( !slotptr(set->page, slot)->dead ) + set->page->dirty = 1; + else { + slotptr(set->page, slot)->dead = 0; + set->page->act++; + } + + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + set->page->min -= vallen + 1; + ((unsigned char *)set->page)[set->page->min] = vallen; + memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen); + + set->page->min -= keylen + 1; + ((unsigned char *)set->page)[set->page->min] = keylen; + memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen); + + slotptr(set->page, slot)->off = set->page->min; + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + return 0; +} uint bt_startkey (BtDb *bt, unsigned char *key, uint len) { @@ -2623,7 +2668,8 @@ BtDb *bt; #endif args = malloc (cnt * sizeof(ThreadArg)); - mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8); +// mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8); + mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, 8191); if( !mgr ) { fprintf(stderr, "Index Open Error %s\n", argv[1]); diff --git a/threadskv3.c b/threadskv3.c index 25501fd..3583b87 100644 --- a/threadskv3.c +++ b/threadskv3.c @@ -2,7 +2,7 @@ // with reworked bt_deletekey code // and phase-fair reader writer lock // and librarian page split code -// 02 SEP 2014 +// 08 SEP 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -147,10 +147,6 @@ typedef struct { // Page key slot definition. -// If BT_maxbits is 15 or less, you can save 2 bytes -// for each key stored by making the two uints -// into ushorts. - // Keys are marked dead, but remain on the page until // it cleanup is called. The fence key (highest key) for // the page is always present, even after cleanup. @@ -1571,14 +1567,11 @@ BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl) unsigned char leftkey[256], rightkey[256]; unsigned char value[BtId]; BtKey ptr; -BtVal val; // remove the old fence value ptr = keyptr(set->page, set->page->cnt); - val = valptr(set->page, set->page->cnt); memcpy (rightkey, ptr, ptr->len + 1); - set->page->garbage += ptr->len + val->len + 2; memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot)); ptr = keyptr(set->page, set->page->cnt); @@ -1648,7 +1641,7 @@ uint idx; BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) { unsigned char lowerfence[256], higherfence[256]; -uint slot, idx, dirty = 0, fence, found; +uint slot, idx, found, fence; BtPageSet set[1], right[1]; unsigned char value[BtId]; BtKey ptr, tst; @@ -1664,8 +1657,6 @@ BtVal val; if( slotptr(set->page, slot)->librarian ) ptr = keyptr(set->page, ++slot); - // are we deleting a fence slot? - fence = slot == set->page->cnt; // if key is found delete it, otherwise ignore request @@ -1673,11 +1664,11 @@ BtVal val; if( found = !keycmp (ptr, key, len) ) if( found = slotptr(set->page, slot)->dead == 0 ) { val = valptr(set->page,slot); - dirty = slotptr(set->page, slot)->dead = 1; + slotptr(set->page, slot)->dead = 1; set->page->garbage += ptr->len + val->len + 2; set->page->act--; - // collapse empty slots beneath our slot + // collapse empty slots beneath the fence while( idx = set->page->cnt - 1 ) if( slotptr(set->page, idx)->dead ) { @@ -1689,7 +1680,7 @@ BtVal val; // did we delete a fence key in an upper level? - if( dirty && lvl && set->page->act && fence ) + if( found && fence && lvl && set->page->act ) if( bt_fixfence (bt, set, lvl) ) return bt->err; else @@ -1834,13 +1825,13 @@ uint newslot = max; BtKey key; BtVal val; - if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1) + if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1) return slot; // skip cleanup and proceed to split // if there's not enough garbage - if( page->garbage + page->min < 2 * page->act * sizeof(BtSlot) + sizeof(*page) + nxt / 3 ) + if( page->garbage < nxt / 5 ) return 0; memcpy (bt->frame, page, bt->mgr->page_size); @@ -1875,9 +1866,11 @@ BtVal val; // make a librarian slot - slotptr(page, ++idx)->off = nxt; - slotptr(page, idx)->librarian = 1; - slotptr(page, idx)->dead = 1; + if( idx ) { + slotptr(page, ++idx)->off = nxt; + slotptr(page, idx)->librarian = 1; + slotptr(page, idx)->dead = 1; + } // set up the slot @@ -1892,7 +1885,7 @@ BtVal val; // see if page has enough space now, or does it need splitting? - if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 ) + if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 ) return newslot; return 0; @@ -1990,9 +1983,11 @@ BtVal val; // add librarian slot - slotptr(bt->frame, ++idx)->off = nxt; - slotptr(bt->frame, idx)->librarian = 1; - slotptr(bt->frame, idx)->dead = 1; + if( idx ) { + slotptr(bt->frame, ++idx)->off = nxt; + slotptr(bt->frame, idx)->librarian = 1; + slotptr(bt->frame, idx)->dead = 1; + } // add actual slot @@ -2051,9 +2046,11 @@ BtVal val; // add librarian slot - slotptr(set->page, ++idx)->off = nxt; - slotptr(set->page, idx)->librarian = 1; - slotptr(set->page, idx)->dead = 1; + if( idx ) { + slotptr(set->page, ++idx)->off = nxt; + slotptr(set->page, idx)->librarian = 1; + slotptr(set->page, idx)->dead = 1; + } // add actual slot @@ -2104,97 +2101,68 @@ BtVal val; bt_unpinlatch (right->latch); return 0; } -// Insert new key into the btree at given level. - -BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen) -{ -BtPageSet set[1]; -uint slot, idx; -uint reuse; -BtKey ptr; -BtKey tst; -BtVal val; - - while( 1 ) { - if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) - ptr = keyptr(set->page, slot); - else { - if( !bt->err ) - bt->err = BTERR_ovflw; - return bt->err; - } - - // if librarian slot == found slot, advance to real slot - if( slotptr(set->page, slot)->librarian ) - if( !keycmp (ptr, key, keylen) ) - ptr = keyptr(set->page, ++slot); +// install new key and value onto page +// page must already be checked for +// adequate space - // if key already exists, update value and return - - if( reuse = !keycmp (ptr, key, keylen) ) - if( val = valptr(set->page, slot), val->len >= vallen ) { - if( slotptr(set->page, slot)->dead ) - set->page->act++; - set->page->garbage += val->len - vallen; - slotptr(set->page, slot)->dead = 0; - val->len = vallen; - memcpy (val->value, value, vallen); - bt_unlockpage(BtLockWrite, set->latch); - bt_unpinlatch (set->latch); - bt_unpinpool (set->pool); - return 0; - } else { - if( !slotptr(set->page, slot)->dead ) { - set->page->garbage += val->len + ptr->len + 2; - set->page->act--; - } - slotptr(set->page, slot)->dead = 1; - } - - // if found slot > desired slot and previous slot - // is a librarian slot, use it - - if( !reuse && slot > 1 ) - if( slotptr(set->page, slot-1)->librarian ) - slot--; - - // check if page has enough space +BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen) +{ +uint idx, librarian; +BtSlot *node; - if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) ) - break; + // if found slot > desired slot and previous slot + // is a librarian slot, use it - if( bt_splitpage (bt, set) ) - return bt->err; - } + if( slot > 1 ) + if( slotptr(set->page, slot-1)->librarian ) + slot--; - // calculate next available slot and copy key into page + // copy value onto page set->page->min -= vallen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = vallen; memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen ); + // copy key onto page + set->page->min -= keylen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = keylen; memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen ); + // find first empty slot + for( idx = slot; idx < set->page->cnt; idx++ ) if( slotptr(set->page, idx)->dead ) break; // now insert key into array before slot - if( !reuse && idx == set->page->cnt ) - idx++, set->page->cnt++; + if( idx == set->page->cnt ) + idx += 2, set->page->cnt += 2, librarian = 2; + else + librarian = 1; set->page->act++; - while( idx > slot ) - *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--; + while( idx > slot + librarian - 1 ) + *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--; - slotptr(set->page, slot)->off = set->page->min; - slotptr(set->page, slot)->librarian = 0; - slotptr(set->page, slot)->dead = 0; + // add librarian slot + + if( librarian > 1 ) { + node = slotptr(set->page, slot++); + node->off = set->page->min; + node->librarian = 1; + node->dead = 1; + } + + // fill in new slot + + node = slotptr(set->page, slot); + node->off = set->page->min; + node->librarian = 0; + node->dead = 0; bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (set->latch); @@ -2202,6 +2170,93 @@ BtVal val; return 0; } +// Insert new key into the btree at given level. +// either add a new key or update/add an existing one + +BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen) +{ +BtPageSet set[1]; +uint slot, idx; +uid sequence; +BtKey ptr; +BtVal val; +uint type; + + while( 1 ) { // find the page and slot for the current key + if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) + ptr = keyptr(set->page, slot); + else { + if( !bt->err ) + bt->err = BTERR_ovflw; + return bt->err; + } + + // if librarian slot == found slot, advance to real slot + + if( slotptr(set->page, slot)->librarian ) + if( !keycmp (ptr, key, keylen) ) + ptr = keyptr(set->page, ++slot); + + // check for adequate space on the page + // and insert the new key before slot. + + if( keycmp (ptr, key, keylen) ) { + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + return bt_insertslot (bt, set, slot, key, keylen, value, vallen); + } + + // if key already exists, update value and return + + if( val = valptr(set->page, slot), val->len >= vallen ) { + if( slotptr(set->page, slot)->dead ) + set->page->act++; + set->page->garbage += val->len - vallen; + slotptr(set->page, slot)->dead = 0; + val->len = vallen; + memcpy (val->value, value, vallen); + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + + // new update value doesn't fit in existing value area + + if( !slotptr(set->page, slot)->dead ) + set->page->garbage += val->len + ptr->len + 2; + else { + slotptr(set->page, slot)->dead = 0; + set->page->act++; + } + + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + set->page->min -= vallen + 1; + ((unsigned char *)set->page)[set->page->min] = vallen; + memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen); + + set->page->min -= keylen + 1; + ((unsigned char *)set->page)[set->page->min] = keylen; + memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen); + + slotptr(set->page, slot)->off = set->page->min; + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + return 0; +} + // cache page of keys into cursor and return starting slot for given key uint bt_startkey (BtDb *bt, unsigned char *key, uint len) @@ -2518,11 +2573,12 @@ FILE *in; if( bt_deletekey (bt, key, len, 0) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); + found += bt->found; len = 0; } else if( len < 255 ) key[len++] = ch; - fprintf(stderr, "finished %s for keys, %d \n", args->infile, line); + fprintf(stderr, "finished %s for %d keys, found %d \n", args->infile, line, found); break; case 'f': @@ -2559,7 +2615,6 @@ FILE *in; set->latch = bt_pinlatch (bt, page_no); bt_lockpage (BtLockRead, set->latch); next = bt_getid (set->page->right); - cnt += set->page->act; for( slot = 0; slot++ < set->page->cnt; ) if( next || slot < set->page->cnt ) @@ -2567,6 +2622,7 @@ FILE *in; ptr = keyptr(set->page, slot); fwrite (ptr->key, ptr->len, 1, stdout); fputc ('\n', stdout); + cnt++; } bt_unlockpage (BtLockRead, set->latch); @@ -2574,7 +2630,6 @@ FILE *in; bt_unpinpool (set->pool); } while( page_no = next ); - cnt--; // remove stopper key fprintf(stderr, " Total keys read %d\n", cnt); break;