X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv8.c;h=9d17a4a6a99cccb611911c7e6fbb91b36d973e36;hp=6b377106ad1666078fbfa663a520df47ea0b9dc4;hb=HEAD;hpb=429185e31f9e6318ca27fce3a557a73280887af4 diff --git a/threadskv8.c b/threadskv8.c index 6b37710..9d17a4a 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -112,13 +112,6 @@ typedef struct { ushort serving[1]; } RWLock; -// write only queue lock - -typedef struct { - ushort ticket[1]; - ushort serving[1]; -} WOLock; - #define PHID 0x1 #define PRES 0x2 #define MASK 0x3 @@ -141,6 +134,14 @@ volatile typedef struct { #define BOTH 3 #define SHARE 4 +// write only reentrant lock + +typedef struct { + BtSpinLatch xcl[1]; + volatile ushort tid[1]; + volatile ushort dup[1]; +} WOLock; + // hash table entries typedef struct { @@ -409,29 +410,39 @@ uid bt_newdup (BtDb *bt) #endif } -// Write-Only Queue Lock +void bt_spinreleasewrite(BtSpinLatch *latch); +void bt_spinwritelock(BtSpinLatch *latch); + +// Write-Only Reentrant Lock -void WriteOLock (WOLock *lock) +void WriteOLock (WOLock *lock, ushort tid) { -ushort tix; -#ifdef unix - tix = __sync_fetch_and_add (lock->ticket, 1); -#else - tix = _InterlockedExchangeAdd16 (lock->ticket, 1); -#endif - // wait for our ticket to come up + while( 1 ) { + bt_spinwritelock(lock->xcl); - while( tix != lock->serving[0] ) -#ifdef unix - sched_yield(); -#else - SwitchToThread (); -#endif + if( *lock->tid == tid ) { + *lock->dup += 1; + bt_spinreleasewrite(lock->xcl); + return; + } + if( !*lock->tid ) { + *lock->tid = tid; + bt_spinreleasewrite(lock->xcl); + return; + } + bt_spinreleasewrite(lock->xcl); + sched_yield(); + } } void WriteORelease (WOLock *lock) { - lock->serving[0]++; + if( *lock->dup ) { + *lock->dup -= 1; + return; + } + + *lock->tid = 0; } // Phase-Fair reader/writer lock implementation @@ -1168,10 +1179,14 @@ void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteOLock (latch->parent); + WriteOLock (latch->parent, bt->thread_no); break; case BtLockAtomic: - WriteOLock (latch->atomic); + WriteOLock (latch->atomic, bt->thread_no); + break; + case BtLockAtomic | BtLockRead: + WriteOLock (latch->atomic, bt->thread_no); + ReadLock (latch->readwr); break; } } @@ -1199,6 +1214,10 @@ void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) case BtLockAtomic: WriteORelease (latch->atomic); break; + case BtLockAtomic | BtLockRead: + WriteORelease (latch->atomic); + ReadRelease (latch->readwr); + break; } } @@ -1569,7 +1588,6 @@ BtKey *ptr; bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); - bt->found = 1; return 0; } @@ -1620,7 +1638,7 @@ BtVal *val; if( bt_fixfence (bt, set, lvl) ) return bt->err; else - return bt->found = found, 0; + return 0; // do we need to collapse root? @@ -1628,7 +1646,7 @@ BtVal *val; if( bt_collapseroot (bt, set) ) return bt->err; else - return bt->found = found, 0; + return 0; // delete empty page @@ -1638,7 +1656,7 @@ BtVal *val; set->latch->dirty = 1; bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); - return bt->found = found, 0; + return 0; } BtKey *bt_foundkey (BtDb *bt) @@ -2273,79 +2291,6 @@ typedef struct { unsigned char leafkey[BT_keyarray]; } AtomicKey; -// find and load leaf page for given key -// leave page Atomic locked and Read locked. - -int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) -{ -BtLatchSet *prevlatch; -uid page_no; -uint slot; - - // find level one slot - - if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) - return 0; - - // find next non-dead entry on this page - // it will be the fence key if nothing else - - while( slotptr(set->page, slot)->dead ) - if( slot++ < set->page->cnt ) - continue; - else - return bt->err = BTERR_struct, 0; - - page_no = bt_getid(valptr(set->page, slot)->value); - prevlatch = set->latch; - - while( page_no ) { - if( set->latch = bt_pinlatch (bt, page_no, 1) ) - set->page = bt_mappage (bt, set->latch); - else - return 0; - - // obtain read lock using lock chaining with Access mode - // release & unpin parent/left sibling page - - bt_lockpage(bt, BtLockAccess, set->latch); - - bt_unlockpage(bt, BtLockRead, prevlatch); - bt_unpinlatch (prevlatch); - - bt_lockpage(bt, BtLockRead, set->latch); - - // find key on page at this level - // and descend to requested level - - if( !set->page->kill ) - if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { - bt_unlockpage(bt, BtLockRead, set->latch); - bt_lockpage(bt, BtLockAtomic, set->latch); - bt_lockpage(bt, BtLockRead, set->latch); - - if( !set->page->kill ) - if( slot = bt_findslot (set->page, key, len) ) { - bt_unlockpage(bt, BtLockAccess, set->latch); - return slot; - } - - bt_unlockpage(bt, BtLockAtomic, set->latch); - } - - // slide right into next page - - bt_unlockpage(bt, BtLockAccess, set->latch); - page_no = bt_getid(set->page->right); - prevlatch = set->latch; - } - - // return error on end of right chain - - bt->err = BTERR_struct; - return 0; // return error -} - // determine actual page where key is located // return slot number @@ -2589,11 +2534,11 @@ int type; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); - else // release read on previous page + else bt_unlockpage(bt, BtLockRead, set->latch); if( !slot ) - if( slot = bt_atomicload(bt, set, key->key, key->len) ) + if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) ) set->latch->split = 0; else return -1; @@ -2642,7 +2587,7 @@ int type; // unlock last loadpage lock - if( source->cnt > 1 ) + if( source->cnt ) bt_unlockpage(bt, BtLockRead, set->latch); // obtain write lock for each master page @@ -3106,7 +3051,7 @@ uint slot = 0; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3139,9 +3084,9 @@ BtKey *ptr; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->parent, 0, sizeof(RWLock)); + memset ((ushort *)latch->parent, 0, sizeof(WOLock)); if( latch->pin ) { fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);