X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv8.c;h=3f669b83d933d83d640cd3fe7079aacae2eb537f;hp=02ac19847c5cbf3eb3cf6991a281049d73b728f1;hb=7a619addda2eab9c15567b62297a612b1d595f2c;hpb=c78ccc15b7098665ae08c2cbe577ad16169da06b diff --git a/threadskv8.c b/threadskv8.c index 02ac198..3f669b8 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -117,6 +117,8 @@ typedef struct { typedef struct { ushort ticket[1]; ushort serving[1]; + ushort tid; + ushort dup; } WOLock; #define PHID 0x1 @@ -411,9 +413,14 @@ uid bt_newdup (BtDb *bt) // Write-Only Queue Lock -void WriteOLock (WOLock *lock) +void WriteOLock (WOLock *lock, ushort tid) { ushort tix; + + if( lock->tid == tid ) { + lock->dup++; + return; + } #ifdef unix tix = __sync_fetch_and_add (lock->ticket, 1); #else @@ -427,10 +434,17 @@ ushort tix; #else SwitchToThread (); #endif + lock->tid = tid; } void WriteORelease (WOLock *lock) { + if( lock->dup ) { + lock->dup--; + return; + } + + lock->tid = 0; lock->serving[0]++; } @@ -1168,10 +1182,14 @@ void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteOLock (latch->parent); + WriteOLock (latch->parent, bt->thread_no); break; case BtLockAtomic: - WriteOLock (latch->atomic); + WriteOLock (latch->atomic, bt->thread_no); + break; + case BtLockAtomic | BtLockRead: + WriteOLock (latch->atomic, bt->thread_no); + ReadLock (latch->readwr); break; } } @@ -1199,6 +1217,10 @@ void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) case BtLockAtomic: WriteORelease (latch->atomic); break; + case BtLockAtomic | BtLockRead: + WriteORelease (latch->atomic); + ReadRelease (latch->readwr); + break; } } @@ -1569,7 +1591,6 @@ BtKey *ptr; bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); - bt->found = 1; return 0; } @@ -1620,7 +1641,7 @@ BtVal *val; if( bt_fixfence (bt, set, lvl) ) return bt->err; else - return bt->found = found, 0; + return 0; // do we need to collapse root? @@ -1628,7 +1649,7 @@ BtVal *val; if( bt_collapseroot (bt, set) ) return bt->err; else - return bt->found = found, 0; + return 0; // delete empty page @@ -1638,7 +1659,7 @@ BtVal *val; set->latch->dirty = 1; bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); - return bt->found = found, 0; + return 0; } BtKey *bt_foundkey (BtDb *bt) @@ -1777,7 +1798,9 @@ BtVal *val; while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 2; - if( cnt < max && slotptr(bt->frame,cnt)->dead ) + + if( cnt < max || bt->frame->lvl ) + if( slotptr(bt->frame,cnt)->dead ) continue; // copy the value across @@ -1794,11 +1817,9 @@ BtVal *val; // make a librarian slot - if( idx ) { - slotptr(page, ++idx)->off = nxt; - slotptr(page, idx)->type = Librarian; - slotptr(page, idx)->dead = 1; - } + slotptr(page, ++idx)->off = nxt; + slotptr(page, idx)->type = Librarian; + slotptr(page, idx)->dead = 1; // set up the slot @@ -1918,8 +1939,10 @@ uint prev; idx = 0; while( cnt++ < max ) { - if( slotptr(set->page, cnt)->dead && cnt < max ) + if( cnt < max || set->page->lvl ) + if( slotptr(set->page, cnt)->dead ) continue; + src = valptr(set->page, cnt); nxt -= src->len + sizeof(BtVal); memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal)); @@ -1931,11 +1954,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(bt->frame, ++idx)->off = nxt; - slotptr(bt->frame, idx)->type = Librarian; - slotptr(bt->frame, idx)->dead = 1; - } + slotptr(bt->frame, ++idx)->off = nxt; + slotptr(bt->frame, idx)->type = Librarian; + slotptr(bt->frame, idx)->dead = 1; // add actual slot @@ -1990,11 +2011,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(set->page, ++idx)->off = nxt; - slotptr(set->page, idx)->type = Librarian; - slotptr(set->page, idx)->dead = 1; - } + slotptr(set->page, ++idx)->off = nxt; + slotptr(set->page, idx)->type = Librarian; + slotptr(set->page, idx)->dead = 1; // add actual slot @@ -2264,7 +2283,7 @@ typedef struct { uint entry; // latch table entry number uint slot:31; // page slot number uint reuse:1; // reused previous page -} AtomicMod; +} AtomicTxn; typedef struct { uid page_no; // page number for split leaf @@ -2275,81 +2294,10 @@ typedef struct { unsigned char leafkey[BT_keyarray]; } AtomicKey; -// find and load leaf page for given key -// leave page Atomic locked and Read locked. - -int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) -{ -BtLatchSet *prevlatch; -uid page_no; -uint slot; - - // find level one slot - - if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) - return 0; - - // find next non-dead entry on this page - // it will be the fence key if nothing else - - while( slotptr(set->page, slot)->dead ) - if( slot++ < set->page->cnt ) - continue; - else - return bt->err = BTERR_struct, 0; - - page_no = bt_getid(valptr(set->page, slot)->value); - prevlatch = set->latch; - - while( page_no ) { - if( set->latch = bt_pinlatch (bt, page_no, 1) ) - set->page = bt_mappage (bt, set->latch); - else - return 0; - - // obtain read lock using lock chaining with Access mode - // release & unpin parent/left sibling page - - bt_lockpage(bt, BtLockAccess, set->latch); - - bt_unlockpage(bt, BtLockRead, prevlatch); - bt_unpinlatch (prevlatch); - - bt_lockpage(bt, BtLockRead, set->latch); - - // find key on page at this level - // and descend to requested level - - if( !set->page->kill ) - if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { - bt_unlockpage(bt, BtLockRead, set->latch); - bt_lockpage(bt, BtLockAtomic, set->latch); - bt_lockpage(bt, BtLockRead, set->latch); - bt_unlockpage(bt, BtLockAccess, set->latch); - - if( !set->page->kill ) - if( slot = bt_findslot (set->page, key, len) ) - return slot; - - bt_unlockpage(bt, BtLockAtomic, set->latch); - } - - // slide right into next page - - page_no = bt_getid(set->page->right); - prevlatch = set->latch; - } - - // return error on end of right chain - - bt->err = BTERR_struct; - return 0; // return error -} - // determine actual page where key is located // return slot number -uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set) +uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set) { BtKey *key = keyptr(source,src); uint slot = locks[src].slot; @@ -2366,15 +2314,18 @@ uint entry; return slot; } - // is locks->reuse set? - // if so, find where our key - // is located on previous page or split pages + // is locks->reuse set? or was slot zeroed? + // if so, find where our key is located + // on current page or pages split on + // same page txn operations. do { set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); if( slot = bt_findslot(set->page, key->key, key->len) ) { + if( slotptr(set->page, slot)->type == Librarian ) + slot++; if( locks[src].reuse ) locks[src].entry = entry; return slot; @@ -2385,17 +2336,17 @@ uint entry; return 0; } -BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); BtVal *val = valptr(source, src); BtLatchSet *latch; BtPageSet set[1]; -uint entry; +uint entry, slot; - while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) { - if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) ) - return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); + while( slot = bt_atomicpage (bt, source, locks, src, set) ) { + if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) + return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); if( entry = bt_splitpage (bt, set) ) latch = bt->mgr->latchsets + entry; @@ -2405,15 +2356,16 @@ uint entry; // splice right page into split chain // and WriteLock it. + bt_lockpage(bt, BtLockWrite, latch); latch->split = set->latch->split; set->latch->split = entry; - bt_lockpage(bt, BtLockWrite, latch); + locks[src].slot = 0; } return bt->err = BTERR_atomic; } -BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); uint idx, entry, slot; @@ -2422,16 +2374,23 @@ BtKey *ptr; BtVal *val; if( slot = bt_atomicpage (bt, source, locks, src, set) ) + ptr = keyptr(set->page, slot); + else + return bt->err = BTERR_struct; + + if( !keycmp (ptr, key->key, key->len) ) + if( !slotptr(set->page, slot)->dead ) slotptr(set->page, slot)->dead = 1; + else + return 0; else - return bt->err = BTERR_struct; + return 0; - ptr = keyptr(set->page, slot); val = valptr(set->page, slot); - set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->latch->dirty = 1; set->page->act--; + bt->found++; return 0; } @@ -2534,7 +2493,7 @@ BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtKey *key, *ptr, *key2; BtLatchSet *latch; -AtomicMod *locks; +AtomicTxn *locks; int result = 0; BtSlot temp[1]; BtPage page; @@ -2542,7 +2501,7 @@ BtVal *val; uid right; int type; - locks = calloc (source->cnt + 1, sizeof(AtomicMod)); + locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); head = NULL; tail = NULL; @@ -2578,11 +2537,11 @@ int type; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); - else // release read on previous page + else bt_unlockpage(bt, BtLockRead, set->latch); if( !slot ) - if( slot = bt_atomicload(bt, set, key->key, key->len) ) + if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) ) set->latch->split = 0; else return -1; @@ -2631,7 +2590,7 @@ int type; // unlock last loadpage lock - if( source->cnt > 1 ) + if( source->cnt ) bt_unlockpage(bt, BtLockRead, set->latch); // obtain write lock for each master page @@ -3285,7 +3244,7 @@ FILE *in; } else if( len < BT_maxkey ) key[len++] = ch; - fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); + fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found); break; case 'w':