From 7a619addda2eab9c15567b62297a612b1d595f2c Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 29 Sep 2014 22:52:00 -0700 Subject: [PATCH] Add re-entrant WOnly queued lock, remove bt_atomicload --- threadskv8.c | 114 ++++++++++++++------------------------------------- 1 file changed, 31 insertions(+), 83 deletions(-) diff --git a/threadskv8.c b/threadskv8.c index 6b37710..3f669b8 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -117,6 +117,8 @@ typedef struct { typedef struct { ushort ticket[1]; ushort serving[1]; + ushort tid; + ushort dup; } WOLock; #define PHID 0x1 @@ -411,9 +413,14 @@ uid bt_newdup (BtDb *bt) // Write-Only Queue Lock -void WriteOLock (WOLock *lock) +void WriteOLock (WOLock *lock, ushort tid) { ushort tix; + + if( lock->tid == tid ) { + lock->dup++; + return; + } #ifdef unix tix = __sync_fetch_and_add (lock->ticket, 1); #else @@ -427,10 +434,17 @@ ushort tix; #else SwitchToThread (); #endif + lock->tid = tid; } void WriteORelease (WOLock *lock) { + if( lock->dup ) { + lock->dup--; + return; + } + + lock->tid = 0; lock->serving[0]++; } @@ -1168,10 +1182,14 @@ void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteOLock (latch->parent); + WriteOLock (latch->parent, bt->thread_no); break; case BtLockAtomic: - WriteOLock (latch->atomic); + WriteOLock (latch->atomic, bt->thread_no); + break; + case BtLockAtomic | BtLockRead: + WriteOLock (latch->atomic, bt->thread_no); + ReadLock (latch->readwr); break; } } @@ -1199,6 +1217,10 @@ void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) case BtLockAtomic: WriteORelease (latch->atomic); break; + case BtLockAtomic | BtLockRead: + WriteORelease (latch->atomic); + ReadRelease (latch->readwr); + break; } } @@ -1569,7 +1591,6 @@ BtKey *ptr; bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); - bt->found = 1; return 0; } @@ -1620,7 +1641,7 @@ BtVal *val; if( bt_fixfence (bt, set, lvl) ) return bt->err; else - return bt->found = found, 0; + return 0; // do we need to collapse root? @@ -1628,7 +1649,7 @@ BtVal *val; if( bt_collapseroot (bt, set) ) return bt->err; else - return bt->found = found, 0; + return 0; // delete empty page @@ -1638,7 +1659,7 @@ BtVal *val; set->latch->dirty = 1; bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); - return bt->found = found, 0; + return 0; } BtKey *bt_foundkey (BtDb *bt) @@ -2273,79 +2294,6 @@ typedef struct { unsigned char leafkey[BT_keyarray]; } AtomicKey; -// find and load leaf page for given key -// leave page Atomic locked and Read locked. - -int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) -{ -BtLatchSet *prevlatch; -uid page_no; -uint slot; - - // find level one slot - - if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) - return 0; - - // find next non-dead entry on this page - // it will be the fence key if nothing else - - while( slotptr(set->page, slot)->dead ) - if( slot++ < set->page->cnt ) - continue; - else - return bt->err = BTERR_struct, 0; - - page_no = bt_getid(valptr(set->page, slot)->value); - prevlatch = set->latch; - - while( page_no ) { - if( set->latch = bt_pinlatch (bt, page_no, 1) ) - set->page = bt_mappage (bt, set->latch); - else - return 0; - - // obtain read lock using lock chaining with Access mode - // release & unpin parent/left sibling page - - bt_lockpage(bt, BtLockAccess, set->latch); - - bt_unlockpage(bt, BtLockRead, prevlatch); - bt_unpinlatch (prevlatch); - - bt_lockpage(bt, BtLockRead, set->latch); - - // find key on page at this level - // and descend to requested level - - if( !set->page->kill ) - if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { - bt_unlockpage(bt, BtLockRead, set->latch); - bt_lockpage(bt, BtLockAtomic, set->latch); - bt_lockpage(bt, BtLockRead, set->latch); - - if( !set->page->kill ) - if( slot = bt_findslot (set->page, key, len) ) { - bt_unlockpage(bt, BtLockAccess, set->latch); - return slot; - } - - bt_unlockpage(bt, BtLockAtomic, set->latch); - } - - // slide right into next page - - bt_unlockpage(bt, BtLockAccess, set->latch); - page_no = bt_getid(set->page->right); - prevlatch = set->latch; - } - - // return error on end of right chain - - bt->err = BTERR_struct; - return 0; // return error -} - // determine actual page where key is located // return slot number @@ -2589,11 +2537,11 @@ int type; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); - else // release read on previous page + else bt_unlockpage(bt, BtLockRead, set->latch); if( !slot ) - if( slot = bt_atomicload(bt, set, key->key, key->len) ) + if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) ) set->latch->split = 0; else return -1; @@ -2642,7 +2590,7 @@ int type; // unlock last loadpage lock - if( source->cnt > 1 ) + if( source->cnt ) bt_unlockpage(bt, BtLockRead, set->latch); // obtain write lock for each master page -- 2.40.0