From 139a1d55e5bd4a7fe1f605c8865fab586ae64a0f Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 24 Sep 2014 08:16:37 -0700 Subject: [PATCH] Fix a few bugs in atomic modification --- threadskv8.c | 143 +++++++++++++++++++++++++++------------------------ 1 file changed, 76 insertions(+), 67 deletions(-) diff --git a/threadskv8.c b/threadskv8.c index d2cb778..3bae89e 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -652,6 +652,7 @@ BtLatchSet *latch = bt->mgr->latchsets + slot; memset (&latch->atomictid, 0, sizeof(latch->atomictid)); latch->page_no = page_no; latch->entry = slot; + latch->split = 0; latch->prev = 0; latch->pin = 1; @@ -1109,62 +1110,62 @@ int ans; // place write, read, or parent lock on requested page_no. -void bt_lockpage(BtLock mode, BtLatchSet *set) +void bt_lockpage(BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: - ReadLock (set->readwr); + ReadLock (latch->readwr); break; case BtLockWrite: - WriteLock (set->readwr); + WriteLock (latch->readwr); break; case BtLockAccess: - ReadLock (set->access); + ReadLock (latch->access); break; case BtLockDelete: - WriteLock (set->access); + WriteLock (latch->access); break; case BtLockParent: - WriteLock (set->parent); + WriteLock (latch->parent); break; case BtLockAtomic: - WriteLock (set->atomic); + WriteLock (latch->atomic); break; case BtLockAtomic | BtLockRead: - WriteLock (set->atomic); - ReadLock (set->readwr); + WriteLock (latch->atomic); + ReadLock (latch->readwr); break; } } // remove write, read, or parent lock on requested page -void bt_unlockpage(BtLock mode, BtLatchSet *set) +void bt_unlockpage(BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: - ReadRelease (set->readwr); + ReadRelease (latch->readwr); break; case BtLockWrite: - WriteRelease (set->readwr); + WriteRelease (latch->readwr); break; case BtLockAccess: - ReadRelease (set->access); + ReadRelease (latch->access); break; case BtLockDelete: - WriteRelease (set->access); + WriteRelease (latch->access); break; case BtLockParent: - WriteRelease (set->parent); + WriteRelease (latch->parent); break; case BtLockAtomic: - memset (&set->atomictid, 0, sizeof(set->atomictid)); - WriteRelease (set->atomic); + memset (&latch->atomictid, 0, sizeof(latch->atomictid)); + WriteRelease (latch->atomic); break; case BtLockAtomic | BtLockRead: - ReadRelease (set->readwr); - memset (&set->atomictid, 0, sizeof(set->atomictid)); - WriteRelease (set->atomic); + ReadRelease (latch->readwr); + memset (&latch->atomictid, 0, sizeof(latch->atomictid)); + WriteRelease (latch->atomic); break; } } @@ -1219,14 +1220,14 @@ int blk; // find slot in page for given key at a given level -int bt_findslot (BtPageSet *set, unsigned char *key, uint len) +int bt_findslot (BtPage page, unsigned char *key, uint len) { -uint diff, higher = set->page->cnt, low = 1, slot; +uint diff, higher = page->cnt, low = 1, slot; uint good = 0; // make stopper key an infinite fence value - if( bt_getid (set->page->right) ) + if( bt_getid (page->right) ) higher++; else good++; @@ -1239,7 +1240,7 @@ uint good = 0; while( diff = higher - low ) { slot = low + ( diff >> 1 ); - if( keycmp (keyptr(set->page, slot), key, len) < 0 ) + if( keycmp (keyptr(page, slot), key, len) < 0 ) low = slot + 1; else higher = slot, good++; @@ -1329,7 +1330,7 @@ uint mode, prevmode; if( set->page->kill ) goto slideright; - if( slot = bt_findslot (set, key, len) ) { + if( slot = bt_findslot (set->page, key, len) ) { if( drill == lvl ) return slot; @@ -2263,7 +2264,7 @@ uint slot = locks[src].slot; uint entry; if( src > 1 && locks[src].reuse ) - entry = locks[src-1].entry, locks[src].entry = entry, slot = 0; + entry = locks[src-1].entry, slot = 0; else entry = locks[src].entry; @@ -2281,9 +2282,11 @@ uint entry; set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); - if( slot = bt_findslot(set, key->key, key->len) ) - return slot; - + if( slot = bt_findslot(set->page, key->key, key->len) ) { + if( locks[src].reuse ) + locks[src].entry = entry; + return slot; + } } while( entry = set->latch->split ); bt->err = BTERR_atomic; @@ -2296,15 +2299,20 @@ BtKey *key = keyptr(source, src); BtVal *val = valptr(source, src); BtLatchSet *latch; BtPageSet set[1]; -uint slot, entry; +uint entry; + + while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) { + if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) ) + return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); - while( slot = bt_atomicpage (bt, source, locks, src, set) ) { - if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) - return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); if( entry = bt_splitpage (bt, set) ) latch = bt->mgr->latchsets + entry; else return bt->err; + + // splice right page into split chain + // and WriteLock it. + latch->split = set->latch->split; set->latch->split = entry; bt_lockpage(BtLockWrite, latch); @@ -2413,22 +2421,22 @@ int type; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 ) - slot = bt_findslot(set, key->key, key->len); + slot = bt_findslot(set->page, key->key, key->len); else // release read on previous page bt_unlockpage(BtLockRead, set->latch); - if( !slot ) { - slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead); - set->latch->split = 0; - } - if( !slot ) - return -1; + if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) ) + set->latch->split = 0; + else + return -1; + + if( slotptr(set->page, slot)->type == Librarian ) + ptr = keyptr(set->page, ++slot); + else + ptr = keyptr(set->page, slot); if( !samepage ) { - for(idx = 0; idx++ < source->cnt; ) - if( locks[idx].entry == set->latch->entry ) - abort(); locks[src].entry = set->latch->entry; locks[src].slot = slot; locks[src].reuse = 0; @@ -2438,11 +2446,6 @@ int type; locks[src].reuse = 1; } - if( slotptr(set->page, slot)->type == Librarian ) - ptr = keyptr(set->page, ++slot); - else - ptr = keyptr(set->page, slot); - switch( slotptr(source, src)->type ) { case Duplicate: case Unique: @@ -2491,6 +2494,7 @@ int type; return result; } + } // unlock last loadpage lock @@ -2512,7 +2516,8 @@ int type; return -1; else continue; - else if( bt_atomicinsert (bt, source, locks, src) ) + else + if( bt_atomicinsert (bt, source, locks, src) ) return -1; else continue; @@ -2528,8 +2533,7 @@ int type; prev->latch = bt->mgr->latchsets + locks[src].entry; prev->page = bt_mappage (bt, prev->latch); - // pick-up all splits from first entry - // save page that points to this entry + // pick-up all splits from original page split = next = prev->latch->split; @@ -2540,6 +2544,7 @@ int type; set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); next = set->latch->split; + set->latch->split = 0; // delete empty previous page @@ -2556,8 +2561,7 @@ int type; continue; } - // prev page is not emptied - + // prev page is not emptied locks[src].emptied = 0; // schedule previous fence key update @@ -2575,7 +2579,6 @@ int type; else head = leaf; - latch = prev->latch; tail = leaf; // remove empty block from the split chain @@ -2584,13 +2587,16 @@ int type; memcpy (prev->page->right, set->page->right, BtId); bt_lockpage (BtLockDelete, set->latch); bt_freepage (bt, set); - } else - *prev = *set; + continue; + } - bt_lockpage(BtLockParent, latch); - bt_unlockpage(BtLockWrite, latch); + bt_lockpage(BtLockParent, prev->latch); + bt_unlockpage(BtLockWrite, prev->latch); + *prev = *set; } + // was entire chain emptied? + if( !prev->page->act ) continue; @@ -2627,21 +2633,24 @@ int type; if( locks[src].reuse ) continue; - // delete page emptied by our atomic action - set->latch = bt->mgr->latchsets + locks[src].entry; set->page = bt_mappage (bt, set->latch); - if( locks[src].emptied ) { - bt_unlockpage (BtLockAtomic, set->latch); - if( bt_deletepage (bt, set) ) - return bt->err; - continue; - } + // clear original page split field + split = set->latch->split; + set->latch->split = 0; bt_unlockpage (BtLockAtomic, set->latch); - if( !set->latch->split ) + // delete page emptied by our atomic action + + if( locks[src].emptied ) + if( bt_deletepage (bt, set) ) + return bt->err; + else + continue; + + if( !split ) bt_unpinlatch (set->latch); } -- 2.40.0