X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv1.c;h=2d95c4a6a3a1c543cbacc716950ab561e1516228;hp=1bbe7792aab2643b033cb542d62cd866fe331e16;hb=HEAD;hpb=246c81476d1ada6fc73ffd182cecd5b6192670ef diff --git a/threadskv1.c b/threadskv1.c index 1bbe779..2d95c4a 100644 --- a/threadskv1.c +++ b/threadskv1.c @@ -1,7 +1,7 @@ // btree version threadskv1 sched_yield version // with reworked bt_deletekey code // and phase-fair reader writer lock -// 12 MAR 2014 +// 08 SEP 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -1504,8 +1504,10 @@ BtPool *prevpool; // find key on page at this level // and descend to requested level - if( !set->page->kill ) - if( slot = bt_findslot (set, key, len) ) { + if( set->page->kill ) + goto slideright; + + if( slot = bt_findslot (set, key, len) ) { if( drill == lvl ) return slot; @@ -1518,7 +1520,7 @@ BtPool *prevpool; page_no = bt_getid(valptr(set->page, slot)->value); drill--; continue; - } + } // or slide right into next page @@ -1566,7 +1568,6 @@ BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl) { unsigned char leftkey[256], rightkey[256]; unsigned char value[BtId]; -uid page_no; BtKey ptr; // remove the old fence value @@ -1579,14 +1580,13 @@ BtKey ptr; ptr = keyptr(set->page, set->page->cnt); memcpy (leftkey, ptr, ptr->len + 1); - page_no = set->page_no; bt_lockpage (BtLockParent, set->latch); bt_unlockpage (BtLockWrite, set->latch); // insert new (now smaller) fence key - bt_putid (value, page_no); + bt_putid (value, set->page_no); if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) ) return bt->err; @@ -1645,7 +1645,7 @@ uint idx; BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) { unsigned char lowerfence[256], higherfence[256]; -uint slot, idx, dirty = 0, fence, found; +uint slot, idx, found, fence; BtPageSet set[1], right[1]; unsigned char value[BtId]; BtKey ptr; @@ -1655,7 +1655,7 @@ BtKey ptr; else return bt->err; - // are we deleting a fence slot? + // are we deleting a fence key? fence = slot == set->page->cnt; @@ -1663,11 +1663,11 @@ BtKey ptr; if( found = !keycmp (ptr, key, len) ) if( found = slotptr(set->page, slot)->dead == 0 ) { - dirty = slotptr(set->page, slot)->dead = 1; + slotptr(set->page, slot)->dead = 1; set->page->dirty = 1; set->page->act--; - // collapse empty slots + // collapse empty slots beneath our fence while( idx = set->page->cnt - 1 ) if( slotptr(set->page, idx)->dead ) { @@ -1679,7 +1679,7 @@ BtKey ptr; // did we delete a fence key in an upper level? - if( dirty && lvl && set->page->act && fence ) + if( found && fence && lvl && set->page->act ) if( bt_fixfence (bt, set, lvl) ) return bt->err; else @@ -1787,8 +1787,8 @@ int ret; else return 0; - // if key exists, return TRUE - // otherwise return FALSE + // if key exists, return >= 0 value bytes copied + // otherwise return (-1) if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) { val = valptr (set->page,slot); @@ -1844,12 +1844,6 @@ BtVal val; if( cnt < max && slotptr(bt->frame,cnt)->dead ) continue; - // copy the key across - - key = keyptr(bt->frame, cnt); - nxt -= key->len + 1; - memcpy ((unsigned char *)page + nxt, key, key->len + 1); - // copy the value across val = valptr(bt->frame, cnt); @@ -1857,9 +1851,15 @@ BtVal val; ((unsigned char *)page)[nxt] = val->len; memcpy ((unsigned char *)page + nxt + 1, val->value, val->len); + // copy the key across + + key = keyptr(bt->frame, cnt); + nxt -= key->len + 1; + memcpy ((unsigned char *)page + nxt, key, key->len + 1); + // set up the slot - slotptr(page, idx)->off = nxt; + slotptr(page, ++idx)->off = nxt; if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) ) page->act++; @@ -2057,81 +2057,48 @@ BtVal val; bt_unpinlatch (right->latch); return 0; } -// Insert new key into the btree at given level. +// install new key and value onto page +// page must already be checked for +// adequate space -BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen) +BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen) { -BtPageSet set[1]; -uint slot, idx; -uint reuse; -BtKey ptr; -BtVal val; - - while( 1 ) { - if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) - ptr = keyptr(set->page, slot); - else - { - if( !bt->err ) - bt->err = BTERR_ovflw; - return bt->err; - } - - // if key already exists, update value and return - - if( reuse = !keycmp (ptr, key, keylen) ) - if( val = valptr(set->page, slot), val->len >= vallen ) { - if( slotptr(set->page, slot)->dead ) - set->page->act++; - slotptr(set->page, slot)->dead = 0; - val->len = vallen; - memcpy (val->value, value, vallen); - bt_unlockpage(BtLockWrite, set->latch); - bt_unpinlatch (set->latch); - bt_unpinpool (set->pool); - return 0; - } else { - if( !slotptr(set->page, slot)->dead ) - set->page->act--; - slotptr(set->page, slot)->dead = 1; - set->page->dirty = 1; - } - - // check if page has enough space - - if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) ) - break; - - if( bt_splitpage (bt, set) ) - return bt->err; - } +BtSlot *node; +uint idx; - // calculate next available slot and copy key into page + // copy value onto page set->page->min -= vallen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = vallen; memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen ); + // copy key onto page + set->page->min -= keylen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = keylen; memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen ); + // find first empty slot + for( idx = slot; idx < set->page->cnt; idx++ ) if( slotptr(set->page, idx)->dead ) break; // now insert key into array before slot - if( !reuse && idx == set->page->cnt ) - idx++, set->page->cnt++; + if( idx == set->page->cnt ) + idx += 1, set->page->cnt += 1; set->page->act++; while( idx > slot ) - *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--; + *slotptr(set->page, idx) = *slotptr(set->page, idx - 1), idx--; - slotptr(set->page, slot)->off = set->page->min; - slotptr(set->page, slot)->dead = 0; + // fill in new slot + + node = slotptr(set->page, slot); + node->off = set->page->min; + node->dead = 0; bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (set->latch); @@ -2139,7 +2106,85 @@ BtVal val; return 0; } -// cache page of keys into cursor and return starting slot for given key +// Insert new key into the btree at given level. +// either add a new key or update/add an existing one + +BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen) +{ +BtPageSet set[1]; +uint slot, idx; +uid sequence; +BtKey ptr; +BtVal val; + + while( 1 ) { // find the page and slot for the current key + if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) + ptr = keyptr(set->page, slot); + else { + if( !bt->err ) + bt->err = BTERR_ovflw; + return bt->err; + } + + // check for adequate space on the page + // and insert the new key before slot. + + if( keycmp (ptr, key, keylen) ) { + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + return bt_insertslot (bt, set, slot, key, keylen, value, vallen); + } + + // if key already exists, update value and return + + if( val = valptr(set->page, slot), val->len >= vallen ) { + if( slotptr(set->page, slot)->dead ) + set->page->act++; + slotptr(set->page, slot)->dead = 0; + set->page->dirty = 1; + val->len = vallen; + memcpy (val->value, value, vallen); + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + + // new update value doesn't fit in existing value area + + if( !slotptr(set->page, slot)->dead ) + set->page->dirty = 1; + else { + slotptr(set->page, slot)->dead = 0; + set->page->act++; + } + + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + set->page->min -= vallen + 1; + ((unsigned char *)set->page)[set->page->min] = vallen; + memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen); + + set->page->min -= keylen + 1; + ((unsigned char *)set->page)[set->page->min] = keylen; + memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen); + + slotptr(set->page, slot)->off = set->page->min; + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + return 0; +} uint bt_startkey (BtDb *bt, unsigned char *key, uint len) { @@ -2364,7 +2409,8 @@ BtKey ptr; } typedef struct { - char type, idx; + char idx; + char *type; char *infile; BtMgr *mgr; int num; @@ -2391,7 +2437,7 @@ FILE *in; bt = bt_open (args->mgr); - switch(args->type | 0x20) + switch(args->type[0] | 0x20) { case 'a': fprintf(stderr, "started latch mgr audit\n"); @@ -2452,6 +2498,8 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; + if( (args->type[1] | 0x20) == 'p' ) + len = 10; if( bt_deletekey (bt, key, len, 0) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; @@ -2474,6 +2522,8 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; + if( (args->type[1] | 0x20) == 'p' ) + len = 10; if( bt_findkey (bt, key, len, NULL, 0) == 0 ) found++; else if( bt->err ) @@ -2618,7 +2668,8 @@ BtDb *bt; #endif args = malloc (cnt * sizeof(ThreadArg)); - mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8); +// mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8); + mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, 8191); if( !mgr ) { fprintf(stderr, "Index Open Error %s\n", argv[1]); @@ -2629,7 +2680,7 @@ BtDb *bt; for( idx = 0; idx < cnt; idx++ ) { args[idx].infile = argv[idx + 7]; - args[idx].type = argv[2][0]; + args[idx].type = argv[2]; args[idx].mgr = mgr; args[idx].num = num; args[idx].idx = idx;