X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv8.c;h=9d17a4a6a99cccb611911c7e6fbb91b36d973e36;hp=44411881f67455309eede48bb81ef3ae46e81d77;hb=HEAD;hpb=7af1d62ea9301c15ee80a428749a17075ba44627 diff --git a/threadskv8.c b/threadskv8.c index 4441188..9d17a4a 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -112,13 +112,6 @@ typedef struct { ushort serving[1]; } RWLock; -// write only queue lock - -typedef struct { - ushort ticket[1]; - ushort serving[1]; -} WOLock; - #define PHID 0x1 #define PRES 0x2 #define MASK 0x3 @@ -141,6 +134,14 @@ volatile typedef struct { #define BOTH 3 #define SHARE 4 +// write only reentrant lock + +typedef struct { + BtSpinLatch xcl[1]; + volatile ushort tid[1]; + volatile ushort dup[1]; +} WOLock; + // hash table entries typedef struct { @@ -409,29 +410,39 @@ uid bt_newdup (BtDb *bt) #endif } -// Write-Only Queue Lock +void bt_spinreleasewrite(BtSpinLatch *latch); +void bt_spinwritelock(BtSpinLatch *latch); -void WriteOLock (WOLock *lock) +// Write-Only Reentrant Lock + +void WriteOLock (WOLock *lock, ushort tid) { -ushort tix; -#ifdef unix - tix = __sync_fetch_and_add (lock->ticket, 1); -#else - tix = _InterlockedExchangeAdd16 (lock->ticket, 1); -#endif - // wait for our ticket to come up + while( 1 ) { + bt_spinwritelock(lock->xcl); - while( tix != lock->serving[0] ) -#ifdef unix - sched_yield(); -#else - SwitchToThread (); -#endif + if( *lock->tid == tid ) { + *lock->dup += 1; + bt_spinreleasewrite(lock->xcl); + return; + } + if( !*lock->tid ) { + *lock->tid = tid; + bt_spinreleasewrite(lock->xcl); + return; + } + bt_spinreleasewrite(lock->xcl); + sched_yield(); + } } void WriteORelease (WOLock *lock) { - lock->serving[0]++; + if( *lock->dup ) { + *lock->dup -= 1; + return; + } + + *lock->tid = 0; } // Phase-Fair reader/writer lock implementation @@ -1168,10 +1179,14 @@ void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteOLock (latch->parent); + WriteOLock (latch->parent, bt->thread_no); break; case BtLockAtomic: - WriteOLock (latch->atomic); + WriteOLock (latch->atomic, bt->thread_no); + break; + case BtLockAtomic | BtLockRead: + WriteOLock (latch->atomic, bt->thread_no); + ReadLock (latch->readwr); break; } } @@ -1199,6 +1214,10 @@ void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) case BtLockAtomic: WriteORelease (latch->atomic); break; + case BtLockAtomic | BtLockRead: + WriteORelease (latch->atomic); + ReadRelease (latch->readwr); + break; } } @@ -1569,7 +1588,6 @@ BtKey *ptr; bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); - bt->found = 1; return 0; } @@ -1620,7 +1638,7 @@ BtVal *val; if( bt_fixfence (bt, set, lvl) ) return bt->err; else - return bt->found = found, 0; + return 0; // do we need to collapse root? @@ -1628,7 +1646,7 @@ BtVal *val; if( bt_collapseroot (bt, set) ) return bt->err; else - return bt->found = found, 0; + return 0; // delete empty page @@ -1638,7 +1656,7 @@ BtVal *val; set->latch->dirty = 1; bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); - return bt->found = found, 0; + return 0; } BtKey *bt_foundkey (BtDb *bt) @@ -1777,7 +1795,9 @@ BtVal *val; while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 2; - if( cnt < max && slotptr(bt->frame,cnt)->dead ) + + if( cnt < max || bt->frame->lvl ) + if( slotptr(bt->frame,cnt)->dead ) continue; // copy the value across @@ -1794,11 +1814,9 @@ BtVal *val; // make a librarian slot - if( idx ) { - slotptr(page, ++idx)->off = nxt; - slotptr(page, idx)->type = Librarian; - slotptr(page, idx)->dead = 1; - } + slotptr(page, ++idx)->off = nxt; + slotptr(page, idx)->type = Librarian; + slotptr(page, idx)->dead = 1; // set up the slot @@ -1918,8 +1936,10 @@ uint prev; idx = 0; while( cnt++ < max ) { - if( slotptr(set->page, cnt)->dead && cnt < max ) + if( cnt < max || set->page->lvl ) + if( slotptr(set->page, cnt)->dead ) continue; + src = valptr(set->page, cnt); nxt -= src->len + sizeof(BtVal); memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal)); @@ -1931,11 +1951,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(bt->frame, ++idx)->off = nxt; - slotptr(bt->frame, idx)->type = Librarian; - slotptr(bt->frame, idx)->dead = 1; - } + slotptr(bt->frame, ++idx)->off = nxt; + slotptr(bt->frame, idx)->type = Librarian; + slotptr(bt->frame, idx)->dead = 1; // add actual slot @@ -1990,11 +2008,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(set->page, ++idx)->off = nxt; - slotptr(set->page, idx)->type = Librarian; - slotptr(set->page, idx)->dead = 1; - } + slotptr(set->page, ++idx)->off = nxt; + slotptr(set->page, idx)->type = Librarian; + slotptr(set->page, idx)->dead = 1; // add actual slot @@ -2264,7 +2280,7 @@ typedef struct { uint entry; // latch table entry number uint slot:31; // page slot number uint reuse:1; // reused previous page -} AtomicMod; +} AtomicTxn; typedef struct { uid page_no; // page number for split leaf @@ -2275,83 +2291,10 @@ typedef struct { unsigned char leafkey[BT_keyarray]; } AtomicKey; -// find and load leaf page for given key -// leave page Atomic locked and Read locked. - -int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) -{ -BtLatchSet *prevlatch; -uid page_no; -uint slot; - - // find level one slot - - if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) - return 0; - - // find next non-dead entry on this page - // it will be the fence key if nothing else - - while( slotptr(set->page, slot)->dead ) - if( slot++ < set->page->cnt ) - continue; - else - return bt->err = BTERR_struct, 0; - - page_no = bt_getid(valptr(set->page, slot)->value); - prevlatch = set->latch; - - while( page_no ) { - if( set->latch = bt_pinlatch (bt, page_no, 1) ) - set->page = bt_mappage (bt, set->latch); - else - return 0; - - // obtain read lock using lock chaining with Access mode - // release & unpin parent/left sibling page - - bt_lockpage(bt, BtLockAccess, set->latch); - - bt_unlockpage(bt, BtLockRead, prevlatch); - bt_unpinlatch (prevlatch); - - bt_lockpage(bt, BtLockRead, set->latch); - - // find key on page at this level - // and descend to requested level - - if( !set->page->kill ) - if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { - bt_unlockpage(bt, BtLockRead, set->latch); - bt_lockpage(bt, BtLockAtomic, set->latch); - bt_lockpage(bt, BtLockRead, set->latch); - - if( !set->page->kill ) - if( slot = bt_findslot (set->page, key, len) ) { - bt_unlockpage(bt, BtLockAccess, set->latch); - return slot; - } - - bt_unlockpage(bt, BtLockAtomic, set->latch); - } - - // slide right into next page - - bt_unlockpage(bt, BtLockAccess, set->latch); - page_no = bt_getid(set->page->right); - prevlatch = set->latch; - } - - // return error on end of right chain - - bt->err = BTERR_struct; - return 0; // return error -} - // determine actual page where key is located // return slot number -uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set) +uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set) { BtKey *key = keyptr(source,src); uint slot = locks[src].slot; @@ -2368,15 +2311,18 @@ uint entry; return slot; } - // is locks->reuse set? - // if so, find where our key - // is located on previous page or split pages + // is locks->reuse set? or was slot zeroed? + // if so, find where our key is located + // on current page or pages split on + // same page txn operations. do { set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); if( slot = bt_findslot(set->page, key->key, key->len) ) { + if( slotptr(set->page, slot)->type == Librarian ) + slot++; if( locks[src].reuse ) locks[src].entry = entry; return slot; @@ -2387,17 +2333,17 @@ uint entry; return 0; } -BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); BtVal *val = valptr(source, src); BtLatchSet *latch; BtPageSet set[1]; -uint entry; +uint entry, slot; - while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) { - if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) ) - return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); + while( slot = bt_atomicpage (bt, source, locks, src, set) ) { + if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) + return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); if( entry = bt_splitpage (bt, set) ) latch = bt->mgr->latchsets + entry; @@ -2407,15 +2353,16 @@ uint entry; // splice right page into split chain // and WriteLock it. + bt_lockpage(bt, BtLockWrite, latch); latch->split = set->latch->split; set->latch->split = entry; - bt_lockpage(bt, BtLockWrite, latch); + locks[src].slot = 0; } return bt->err = BTERR_atomic; } -BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); uint idx, entry, slot; @@ -2424,16 +2371,23 @@ BtKey *ptr; BtVal *val; if( slot = bt_atomicpage (bt, source, locks, src, set) ) + ptr = keyptr(set->page, slot); + else + return bt->err = BTERR_struct; + + if( !keycmp (ptr, key->key, key->len) ) + if( !slotptr(set->page, slot)->dead ) slotptr(set->page, slot)->dead = 1; + else + return 0; else - return bt->err = BTERR_struct; + return 0; - ptr = keyptr(set->page, slot); val = valptr(set->page, slot); - set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->latch->dirty = 1; set->page->act--; + bt->found++; return 0; } @@ -2536,7 +2490,7 @@ BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtKey *key, *ptr, *key2; BtLatchSet *latch; -AtomicMod *locks; +AtomicTxn *locks; int result = 0; BtSlot temp[1]; BtPage page; @@ -2544,7 +2498,7 @@ BtVal *val; uid right; int type; - locks = calloc (source->cnt + 1, sizeof(AtomicMod)); + locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); head = NULL; tail = NULL; @@ -2580,11 +2534,11 @@ int type; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); - else // release read on previous page + else bt_unlockpage(bt, BtLockRead, set->latch); if( !slot ) - if( slot = bt_atomicload(bt, set, key->key, key->len) ) + if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) ) set->latch->split = 0; else return -1; @@ -2633,7 +2587,7 @@ int type; // unlock last loadpage lock - if( source->cnt > 1 ) + if( source->cnt ) bt_unlockpage(bt, BtLockRead, set->latch); // obtain write lock for each master page @@ -3097,7 +3051,7 @@ uint slot = 0; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3130,9 +3084,9 @@ BtKey *ptr; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->parent, 0, sizeof(RWLock)); + memset ((ushort *)latch->parent, 0, sizeof(WOLock)); if( latch->pin ) { fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); @@ -3287,7 +3241,7 @@ FILE *in; } else if( len < BT_maxkey ) key[len++] = ch; - fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); + fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found); break; case 'w':