From: unknown Date: Mon, 29 Sep 2014 22:14:51 +0000 (-0700) Subject: Fix bug in bt_cleanpage and bt_splitpage X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=9accd7cc4e875f0915128d8d6b09c179fa2896f8 Fix bug in bt_cleanpage and bt_splitpage --- diff --git a/threadskv8.c b/threadskv8.c index 4441188..7cf31e5 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -1777,7 +1777,9 @@ BtVal *val; while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 2; - if( cnt < max && slotptr(bt->frame,cnt)->dead ) + + if( cnt < max || bt->frame->lvl ) + if( slotptr(bt->frame,cnt)->dead ) continue; // copy the value across @@ -1794,11 +1796,9 @@ BtVal *val; // make a librarian slot - if( idx ) { - slotptr(page, ++idx)->off = nxt; - slotptr(page, idx)->type = Librarian; - slotptr(page, idx)->dead = 1; - } + slotptr(page, ++idx)->off = nxt; + slotptr(page, idx)->type = Librarian; + slotptr(page, idx)->dead = 1; // set up the slot @@ -1918,8 +1918,10 @@ uint prev; idx = 0; while( cnt++ < max ) { - if( slotptr(set->page, cnt)->dead && cnt < max ) + if( cnt < max || set->page->lvl ) + if( slotptr(set->page, cnt)->dead ) continue; + src = valptr(set->page, cnt); nxt -= src->len + sizeof(BtVal); memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal)); @@ -1931,11 +1933,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(bt->frame, ++idx)->off = nxt; - slotptr(bt->frame, idx)->type = Librarian; - slotptr(bt->frame, idx)->dead = 1; - } + slotptr(bt->frame, ++idx)->off = nxt; + slotptr(bt->frame, idx)->type = Librarian; + slotptr(bt->frame, idx)->dead = 1; // add actual slot @@ -2264,7 +2264,7 @@ typedef struct { uint entry; // latch table entry number uint slot:31; // page slot number uint reuse:1; // reused previous page -} AtomicMod; +} AtomicTxn; typedef struct { uid page_no; // page number for split leaf @@ -2351,7 +2351,7 @@ uint slot; // determine actual page where key is located // return slot number -uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set) +uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set) { BtKey *key = keyptr(source,src); uint slot = locks[src].slot; @@ -2368,15 +2368,18 @@ uint entry; return slot; } - // is locks->reuse set? - // if so, find where our key - // is located on previous page or split pages + // is locks->reuse set? or was slot zeroed? + // if so, find where our key is located + // on current page or pages split on + // same page txn operations. do { set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); if( slot = bt_findslot(set->page, key->key, key->len) ) { + if( slotptr(set->page, slot)->type == Librarian ) + slot++; if( locks[src].reuse ) locks[src].entry = entry; return slot; @@ -2387,17 +2390,17 @@ uint entry; return 0; } -BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); BtVal *val = valptr(source, src); BtLatchSet *latch; BtPageSet set[1]; -uint entry; +uint entry, slot; - while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) { - if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) ) - return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); + while( slot = bt_atomicpage (bt, source, locks, src, set) ) { + if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) + return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); if( entry = bt_splitpage (bt, set) ) latch = bt->mgr->latchsets + entry; @@ -2407,15 +2410,16 @@ uint entry; // splice right page into split chain // and WriteLock it. + bt_lockpage(bt, BtLockWrite, latch); latch->split = set->latch->split; set->latch->split = entry; - bt_lockpage(bt, BtLockWrite, latch); + locks[src].slot = 0; } return bt->err = BTERR_atomic; } -BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); uint idx, entry, slot; @@ -2424,16 +2428,23 @@ BtKey *ptr; BtVal *val; if( slot = bt_atomicpage (bt, source, locks, src, set) ) + ptr = keyptr(set->page, slot); + else + return bt->err = BTERR_struct; + + if( !keycmp (ptr, key->key, key->len) ) + if( !slotptr(set->page, slot)->dead ) slotptr(set->page, slot)->dead = 1; + else + return 0; else - return bt->err = BTERR_struct; + return 0; - ptr = keyptr(set->page, slot); val = valptr(set->page, slot); - set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->latch->dirty = 1; set->page->act--; + bt->found++; return 0; } @@ -2536,7 +2547,7 @@ BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtKey *key, *ptr, *key2; BtLatchSet *latch; -AtomicMod *locks; +AtomicTxn *locks; int result = 0; BtSlot temp[1]; BtPage page; @@ -2544,7 +2555,7 @@ BtVal *val; uid right; int type; - locks = calloc (source->cnt + 1, sizeof(AtomicMod)); + locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); head = NULL; tail = NULL; @@ -3287,7 +3298,7 @@ FILE *in; } else if( len < BT_maxkey ) key[len++] = ch; - fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); + fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found); break; case 'w':