From: unknown Date: Fri, 26 Sep 2014 23:05:39 +0000 (-0700) Subject: add txn level zero page loader X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=3f25e478bf0646bc6671869801218d0dab1516f0 add txn level zero page loader --- diff --git a/threadskv8.c b/threadskv8.c index 44793a0..574bbd9 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -155,11 +155,6 @@ typedef struct { uint prev; // prev entry in hash table chain volatile ushort pin; // number of outstanding threads ushort dirty:1; // page in cache is dirty -#ifdef unix - pthread_t atomictid; // pid holding atomic lock -#else - uint atomictid; // pid holding atomic lock -#endif } BtLatchSet; // Define the length of the page record numbers @@ -652,7 +647,6 @@ BtLatchSet *latch = bt->mgr->latchsets + slot; bt->mgr->latchsets[latch->next].prev = slot; bt->mgr->hashtable[hashidx].slot = slot; - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); latch->page_no = page_no; latch->entry = slot; latch->split = 0; @@ -1139,10 +1133,6 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) case BtLockAtomic: WriteLock (latch->atomic); break; - case BtLockAtomic | BtLockRead: - WriteLock (latch->atomic); - ReadLock (latch->readwr); - break; } } @@ -1167,12 +1157,6 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) WriteRelease (latch->parent); break; case BtLockAtomic: - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); - WriteRelease (latch->atomic); - break; - case BtLockAtomic | BtLockRead: - ReadRelease (latch->readwr); - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); WriteRelease (latch->atomic); break; } @@ -1293,20 +1277,10 @@ uint mode, prevmode; prevpage = 0; } - // skip Atomic lock on leaf page we need to slide over - - if( !drill ) - if( mode & BtLockAtomic ) - if( pthread_equal (set->latch->atomictid, pthread_self()) ) - mode &= ~BtLockAtomic; - // obtain mode lock using lock chaining through AccessLock bt_lockpage(mode, set->latch); - if( mode & BtLockAtomic ) - set->latch->atomictid = pthread_self(); - if( set->page->free ) return bt->err = BTERR_struct, 0; @@ -1342,11 +1316,13 @@ uint mode, prevmode; if( drill == lvl ) return slot; + // find next non-dead slot -- the fence key if nothing else + while( slotptr(set->page, slot)->dead ) if( slot++ < set->page->cnt ) - continue; + continue; else - goto slideright; + return bt->err = BTERR_struct, 0; page_no = bt_getid(valptr(set->page, slot)->value); drill--; @@ -2263,6 +2239,81 @@ typedef struct { unsigned char leafkey[BT_keyarray]; } AtomicKey; +// find and load leaf page for given key +// leave page Atomic locked and Read locked. + +int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) +{ +BtLatchSet *prevlatch; +uid page_no; +uint slot; + + // find level zero page + + if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) + return 0; + + // find next non-dead entry on this page + // it will be the fence key if nothing else + + while( slotptr(set->page, slot)->dead ) + if( slot++ < set->page->cnt ) + continue; + else + return bt->err = BTERR_struct, 0; + + page_no = bt_getid(valptr(set->page, slot)->value); + prevlatch = set->latch; + + while( page_no ) { + if( set->latch = bt_pinlatch (bt, page_no, 1) ) + set->page = bt_mappage (bt, set->latch); + else + return 0; + + if( set->page->free || set->page->lvl ) + return bt->err = BTERR_struct, 0; + + // obtain read lock using lock chaining with Access mode + // release & unpin parent/left sibling page + + bt_lockpage(BtLockAccess, set->latch); + + bt_unlockpage(BtLockRead, prevlatch); + bt_unpinlatch (prevlatch); + + bt_lockpage(BtLockRead, set->latch); + bt_unlockpage(BtLockAccess, set->latch); + + // find key on page at this level + // and descend to requested level + + if( !set->page->kill ) + if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { + bt_unlockpage(BtLockRead, set->latch); + bt_lockpage(BtLockAtomic, set->latch); + bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(BtLockRead, set->latch); + bt_unlockpage(BtLockAccess, set->latch); + + if( slot = bt_findslot (set->page, key, len) ) + return slot; + + bt_unlockpage(BtLockAtomic, set->latch); + } + + // slide right into next page + + page_no = bt_getid(set->page->right); + prevlatch = set->latch; + } + + // return error on end of right chain + + bt->err = BTERR_struct; + return 0; // return error +} + // determine actual page where key is located // return slot number @@ -2415,7 +2466,7 @@ int type; bt_unlockpage(BtLockRead, set->latch); if( !slot ) - if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) ) + if( slot = bt_atomicload(bt, set, key->key, key->len) ) set->latch->split = 0; else return -1;