From: unknown Date: Wed, 15 Oct 2014 05:58:24 +0000 (-0700) Subject: Fix broken WriteOnly reentrant locks X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=dda5ade3ade141c403b5e70deebbcf28aeed3247 Fix broken WriteOnly reentrant locks --- diff --git a/threadskv10.c b/threadskv10.c index 62ee44e..6ded2da 100644 --- a/threadskv10.c +++ b/threadskv10.c @@ -113,15 +113,15 @@ typedef enum{ typedef struct { union { struct { - volatile uint xlock:1; // one writer has exclusive lock - volatile uint wrt:31; // count of other writers waiting + volatile ushort xlock; // one writer has exclusive lock + volatile ushort wrt; // count of other writers waiting } bits[1]; uint value[1]; }; } BtMutexLatch; #define XCL 1 -#define WRT 2 +#define WRT 65536 // definition for phase-fair reader/writer lock implementation @@ -130,8 +130,6 @@ typedef struct { volatile ushort rout[1]; volatile ushort ticket[1]; volatile ushort serving[1]; - volatile ushort tid[1]; - volatile ushort dup[1]; } RWLock; // write only lock @@ -320,6 +318,24 @@ typedef struct { unsigned char key[BT_keyarray]; // last found complete key } BtDb; +// atomic txn structures + +typedef struct { + logseqno reqlsn; // redo log seq no required + uint entry; // latch table entry number + uint slot:31; // page slot number + uint reuse:1; // reused previous page +} AtomicTxn; + +typedef struct { + uid page_no; // page number for split leaf + void *next; // next key to insert + uint entry:29; // latch table entry number + uint type:2; // 0 == insert, 1 == delete, 2 == free + uint nounlock:1; // don't unlock ParentModification + unsigned char leafkey[BT_keyarray]; +} AtomicKey; + // Catastrophic errors typedef enum { @@ -383,7 +399,8 @@ extern void bt_mgrclose (BtMgr *mgr); extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no); extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no); -// transaction functions +// atomic transaction functions +BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no); BTERR bt_txnpromote (BtDb *bt); // The page is allocated from low and hi ends. @@ -470,7 +487,7 @@ uint slept = 0; while( 1 ) { *prev->value = __sync_fetch_and_or(latch->value, XCL); - if( !prev->bits->xlock ) { // did we set XCL bit? + if( !prev->bits->xlock ) { // did we set XCL? if( slept ) __sync_fetch_and_sub(latch->value, WRT); return; @@ -481,7 +498,7 @@ uint slept = 0; __sync_fetch_and_add(latch->value, WRT); } - sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr); + sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr); slept = 1; } } @@ -497,7 +514,7 @@ BtMutexLatch prev[1]; *prev->value = __sync_fetch_and_or(latch->value, XCL); - // take write access if exclusive bit is clear + // take write access if exclusive bit was clear return !prev->bits->xlock; } @@ -511,20 +528,29 @@ BtMutexLatch prev[1]; *prev->value = __sync_fetch_and_and(latch->value, ~XCL); if( prev->bits->wrt ) - sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); + sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr ); } // Write-Only Queue Lock void WriteOLock (WOLock *lock, ushort tid) { + while( 1 ) { + bt_mutexlock(lock->xcl); + if( *lock->tid == tid ) { *lock->dup += 1; + bt_releasemutex(lock->xcl); return; } - - bt_mutexlock(lock->xcl); - *lock->tid = tid; + if( !*lock->tid ) { + *lock->tid = tid; + bt_releasemutex(lock->xcl); + return; + } + bt_releasemutex(lock->xcl); + sched_yield(); + } } void WriteORelease (WOLock *lock) @@ -535,7 +561,6 @@ void WriteORelease (WOLock *lock) } *lock->tid = 0; - bt_releasemutex(lock->xcl); } // Phase-Fair reader/writer lock implementation @@ -544,10 +569,6 @@ void WriteLock (RWLock *lock, ushort tid) { ushort w, r, tix; - if( *lock->tid == tid ) { - *lock->dup += 1; - return; - } #ifdef unix tix = __sync_fetch_and_add (lock->ticket, 1); #else @@ -574,17 +595,10 @@ ushort w, r, tix; #else SwitchToThread(); #endif - *lock->tid = tid; } void WriteRelease (RWLock *lock) { - if( *lock->dup ) { - *lock->dup -= 1; - return; - } - - *lock->tid = 0; #ifdef unix __sync_fetch_and_and (lock->rin, ~MASK); #else @@ -600,12 +614,6 @@ int ReadTry (RWLock *lock, ushort tid) { ushort w; - // OK if write lock already held by same thread - - if( *lock->tid == tid ) { - *lock->dup += 1; - return 1; - } #ifdef unix w = __sync_fetch_and_add (lock->rin, RINC) & MASK; #else @@ -628,10 +636,6 @@ void ReadLock (RWLock *lock, ushort tid) { ushort w; - if( *lock->tid == tid ) { - *lock->dup += 1; - return; - } #ifdef unix w = __sync_fetch_and_add (lock->rin, RINC) & MASK; #else @@ -648,11 +652,6 @@ ushort w; void ReadRelease (RWLock *lock) { - if( *lock->dup ) { - *lock->dup -= 1; - return; - } - #ifdef unix __sync_fetch_and_add (lock->rout, RINC); #else @@ -781,7 +780,7 @@ uint last, end; last = mgr->redolast & ~0xfff; end = mgr->redoend; - if( end - last + sizeof(BtLogHdr) >= 8192 ) + if( end - last + sizeof(BtLogHdr) >= 32768 ) if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); else @@ -873,7 +872,7 @@ BtVal *val; last = mgr->redolast & ~0xfff; end = mgr->redoend; - if( end - last + sizeof(BtLogHdr) >= 8192 ) + if( end - last + sizeof(BtLogHdr) >= 32768 ) if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); else @@ -883,6 +882,25 @@ BtVal *val; return lsn; } +// sync a single btree page to disk + +BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch) +{ +uint segment = latch->page_no >> 16; +BtPage perm; + + if( bt_writepage (mgr, page, latch->page_no) ) + return mgr->err; + + perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits)); + + if( msync (perm, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + + latch->dirty = 0; + return 0; +} + // read page into buffer pool from permanent location in Btree file BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no) @@ -2432,6 +2450,7 @@ uint idx, librarian; BtSlot *node; BtKey *ptr; BtVal *val; +int rate; // if found slot > desired slot and previous slot // is a librarian slot, use it @@ -2460,27 +2479,52 @@ BtVal *val; if( slotptr(set->page, idx)->dead ) break; - // now insert key into array before slot + // now insert key into array before slot, + // adding as many librarian slots as + // makes sense. - if( idx == set->page->cnt ) - idx += 2, set->page->cnt += 2, librarian = 2; - else - librarian = 1; + if( idx == set->page->cnt ) { + int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot); - set->latch->dirty = 1; - set->page->act++; + librarian = ++idx - slot; + avail /= sizeof(BtSlot); - while( idx > slot + librarian - 1 ) - *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--; + if( avail < 0 ) + avail = 0; - // add librarian slot + if( librarian > avail ) + librarian = avail; - if( librarian > 1 ) { - node = slotptr(set->page, slot++); - node->off = set->page->min; - node->type = Librarian; - node->dead = 1; + if( librarian ) { + rate = (idx - slot) / librarian; + set->page->cnt += librarian; + idx += librarian; + } else + rate = 0; + } else + librarian = 0, rate = 0; + + while( idx > slot ) { + // transfer slot + *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1); + idx--; + + // add librarian slot per rate + + if( librarian ) + if( (idx - slot + 1)/2 <= librarian * rate ) { +// if( rate && !(idx % rate) ) { + node = slotptr(set->page, idx--); + node->off = node[1].off; + node->type = Librarian; + node->dead = 1; + librarian--; + } } +if(librarian) +abort(); + set->latch->dirty = 1; + set->page->act++; // fill in new slot @@ -2604,23 +2648,6 @@ BtVal *val; return 0; } -typedef struct { - logseqno reqlsn; // redo log seq no required - logseqno lsn; // redo log sequence number - uint entry; // latch table entry number - uint slot:31; // page slot number - uint reuse:1; // reused previous page -} AtomicTxn; - -typedef struct { - uid page_no; // page number for split leaf - void *next; // next key to insert - uint entry:29; // latch table entry number - uint type:2; // 0 == insert, 1 == delete, 2 == free - uint nounlock:1; // don't unlock ParentModification - unsigned char leafkey[BT_keyarray]; -} AtomicKey; - // determine actual page where key is located // return slot number @@ -2663,7 +2690,7 @@ uint entry; return 0; } -BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no) +BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn) { BtKey *key = keyptr(source, src); BtVal *val = valptr(source, src); @@ -2675,7 +2702,7 @@ uint entry, slot; if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) { if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) ) return mgr->err; - set->page->lsn = locks[src].lsn; + set->page->lsn = lsn; return 0; } @@ -2699,7 +2726,7 @@ uint entry, slot; // perform delete from smaller btree // insert a delete slot if not found there -BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no) +BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn) { BtKey *key = keyptr(source, src); BtPageSet set[1]; @@ -2728,7 +2755,7 @@ BtVal *val; set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->latch->dirty = 1; - set->page->lsn = locks[src].lsn; + set->page->lsn = lsn; set->page->act--; node->dead = 0; @@ -2832,24 +2859,12 @@ BtKey *ptr; BTERR bt_atomictxn (BtDb *bt, BtPage source) { uint src, idx, slot, samepage, entry, que = 0; -AtomicKey *head, *tail, *leaf; -BtPageSet set[1], prev[1]; -unsigned char value[BtId]; BtKey *key, *ptr, *key2; -BtLatchSet *latch; -AtomicTxn *locks; int result = 0; BtSlot temp[1]; logseqno lsn; -BtPage page; -BtVal *val; -uid right; int type; - locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); - head = NULL; - tail = NULL; - // stable sort the list of keys into order to // prevent deadlocks between threads. @@ -2870,11 +2885,46 @@ int type; // add entries to redo log if( bt->mgr->pagezero->redopages ) - if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) ) - for( src = 0; src++ < source->cnt; ) - locks[src].lsn = lsn; - else - return bt->mgr->err; + lsn = bt_txnredo (bt->mgr, source, bt->thread_no); + else + lsn = 0; + + // perform the individual actions in the transaction + + if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) ) + return bt->mgr->err; + + // if number of active pages + // is greater than the buffer pool + // promote page into larger btree + + if( bt->main ) + while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) + if( bt_txnpromote (bt) ) + return bt->mgr->err; + + // return success + + return 0; +} + +BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no) +{ +uint src, idx, slot, samepage, entry, que = 0; +AtomicKey *head, *tail, *leaf; +BtPageSet set[1], prev[1]; +unsigned char value[BtId]; +BtLatchSet *latch; +AtomicTxn *locks; +BtKey *key, *ptr; +BtPage page; +BtVal *val; +uid right; + + locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); + + head = NULL; + tail = NULL; // Load the leaf page for each key // group same page references with reuse bit @@ -2895,10 +2945,10 @@ int type; bt_unlockpage(BtLockRead, set->latch); if( !slot ) - if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) ) + if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) ) set->latch->split = 0; else - return bt->mgr->err; + return mgr->err; if( slotptr(set->page, slot)->type == Librarian ) ptr = keyptr(set->page, ++slot); @@ -2906,7 +2956,7 @@ int type; ptr = keyptr(set->page, slot); if( !samepage ) { - locks[src].entry = set->latch - bt->mgr->latchsets; + locks[src].entry = set->latch - mgr->latchsets; locks[src].slot = slot; locks[src].reuse = 0; } else { @@ -2932,8 +2982,8 @@ int type; if( locks[src].reuse ) continue; - set->latch = bt->mgr->latchsets + locks[src].entry; - bt_lockpage (BtLockWrite, set->latch, bt->thread_no); + set->latch = mgr->latchsets + locks[src].entry; + bt_lockpage (BtLockWrite, set->latch, thread_no); } // insert or delete each key @@ -2958,22 +3008,26 @@ int type; for( idx = src; idx < samepage; idx++ ) switch( slotptr(source,idx)->type ) { case Delete: - if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) ) - return bt->mgr->err; + if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) ) + return mgr->err; break; case Duplicate: case Unique: - if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) ) - return bt->mgr->err; + if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) ) + return mgr->err; + break; + + default: + bt_atomicpage (mgr, source, locks, idx, set); break; } // after the same page operations have finished, // process master page for splits or deletion. - latch = prev->latch = bt->mgr->latchsets + locks[src].entry; - prev->page = bt_mappage (bt->mgr, prev->latch); + latch = prev->latch = mgr->latchsets + locks[src].entry; + prev->page = bt_mappage (mgr, prev->latch); samepage = src; // pick-up all splits from master page @@ -2982,8 +3036,8 @@ int type; entry = prev->latch->split; while( entry ) { - set->latch = bt->mgr->latchsets + entry; - set->page = bt_mappage (bt->mgr, set->latch); + set->latch = mgr->latchsets + entry; + set->page = bt_mappage (mgr, set->latch); entry = set->latch->split; // delete empty master page by undoing its split @@ -2992,9 +3046,9 @@ int type; if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); - memcpy (prev->page, set->page, bt->mgr->page_size); - bt_lockpage (BtLockDelete, set->latch, bt->thread_no); - bt_freepage (bt->mgr, set); + memcpy (prev->page, set->page, mgr->page_size); + bt_lockpage (BtLockDelete, set->latch, thread_no); + bt_freepage (mgr, set); prev->latch->split = set->latch->split; prev->latch->dirty = 1; @@ -3007,8 +3061,8 @@ int type; if( !set->page->act ) { memcpy (prev->page->right, set->page->right, BtId); prev->latch->split = set->latch->split; - bt_lockpage (BtLockDelete, set->latch, bt->thread_no); - bt_freepage (bt->mgr, set); + bt_lockpage (BtLockDelete, set->latch, thread_no); + bt_freepage (mgr, set); continue; } @@ -3018,7 +3072,7 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->entry = prev->latch - bt->mgr->latchsets; + leaf->entry = prev->latch - mgr->latchsets; leaf->page_no = prev->latch->page_no; leaf->type = 0; @@ -3032,8 +3086,10 @@ int type; // splice in the left link into the split page bt_putid (set->page->left, prev->latch->page_no); - bt_lockpage(BtLockParent, prev->latch, bt->thread_no); + bt_lockpage(BtLockParent, prev->latch, thread_no); bt_unlockpage(BtLockWrite, prev->latch); + if( lsm ) + bt_syncpage (mgr, prev->page, prev->latch); *prev = *set; } @@ -3045,20 +3101,20 @@ int type; // far right sibling or set rightmost page in page zero if( right = bt_getid (prev->page->right) ) { - if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) ) - set->page = bt_mappage (bt->mgr, set->latch); + if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) ) + set->page = bt_mappage (mgr, set->latch); else - return bt->mgr->err; + return mgr->err; - bt_lockpage (BtLockWrite, set->latch, bt->thread_no); + bt_lockpage (BtLockWrite, set->latch, thread_no); bt_putid (set->page->left, prev->latch->page_no); set->latch->dirty = 1; bt_unlockpage (BtLockWrite, set->latch); - bt_unpinlatch (bt->mgr, set->latch); + bt_unpinlatch (mgr, set->latch); } else { // prev is rightmost page - bt_mutexlock (bt->mgr->lock); - bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); - bt_releasemutex(bt->mgr->lock); + bt_mutexlock (mgr->lock); + bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no); + bt_releasemutex(mgr->lock); } // Process last page split in chain @@ -3067,7 +3123,7 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->entry = prev->latch - bt->mgr->latchsets; + leaf->entry = prev->latch - mgr->latchsets; leaf->page_no = prev->latch->page_no; leaf->type = 0; @@ -3078,9 +3134,12 @@ int type; tail = leaf; - bt_lockpage(BtLockParent, prev->latch, bt->thread_no); + bt_lockpage(BtLockParent, prev->latch, thread_no); bt_unlockpage(BtLockWrite, prev->latch); + if( lsm ) + bt_syncpage (mgr, prev->page, prev->latch); + // remove atomic lock on master page bt_unlockpage(BtLockAtomic, latch); @@ -3092,7 +3151,11 @@ int type; if( prev->page->act ) { bt_unlockpage(BtLockWrite, latch); bt_unlockpage(BtLockAtomic, latch); - bt_unpinlatch(bt->mgr, latch); + + if( lsm ) + bt_syncpage (mgr, prev->page, latch); + + bt_unpinlatch(mgr, latch); continue; } @@ -3104,8 +3167,8 @@ int type; ptr = keyptr(prev->page,prev->page->cnt); - if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) - return bt->mgr->err; + if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) ) + return mgr->err; // perform the remainder of the delete // from the FIFO queue @@ -3113,7 +3176,7 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->entry = prev->latch - bt->mgr->latchsets; + leaf->entry = prev->latch - mgr->latchsets; leaf->page_no = prev->latch->page_no; leaf->nounlock = 1; leaf->type = 2; @@ -3129,34 +3192,38 @@ int type; // deletion completes in next phase. bt_unlockpage(BtLockWrite, prev->latch); + + if( lsm ) + bt_syncpage (mgr, prev->page, prev->latch); + } // add & delete keys for any pages split or merged during transaction if( leaf = head ) do { - set->latch = bt->mgr->latchsets + leaf->entry; - set->page = bt_mappage (bt->mgr, set->latch); + set->latch = mgr->latchsets + leaf->entry; + set->page = bt_mappage (mgr, set->latch); bt_putid (value, leaf->page_no); ptr = (BtKey *)leaf->leafkey; switch( leaf->type ) { case 0: // insert key - if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) ) - return bt->mgr->err; + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) + return mgr->err; break; case 1: // delete key - if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) - return bt->mgr->err; + if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) ) + return mgr->err; break; case 2: // free page - if( bt_atomicfree (bt->mgr, set, bt->thread_no) ) - return bt->mgr->err; + if( bt_atomicfree (mgr, set, thread_no) ) + return mgr->err; break; } @@ -3164,22 +3231,11 @@ int type; if( !leaf->nounlock ) bt_unlockpage (BtLockParent, set->latch); - bt_unpinlatch (bt->mgr, set->latch); + bt_unpinlatch (mgr, set->latch); tail = leaf->next; free (leaf); } while( leaf = tail ); - // if number of active pages - // is greater than the buffer pool - // promote page into larger btree - - if( bt->main ) - while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) - if( bt_txnpromote (bt) ) - return bt->mgr->err; - - // return success - free (locks); return 0; } @@ -3227,14 +3283,14 @@ BtVal *val; continue; } - bt_lockpage (BtLockAccess, set->latch, bt->thread_no); + bt_lockpage (BtLockAtomic, set->latch, bt->thread_no); bt_lockpage (BtLockWrite, set->latch, bt->thread_no); - bt_unlockpage(BtLockAccess, set->latch); // entry interiour node or being or was killed if( set->page->lvl || set->page->free || set->page->kill ) { bt_releasemutex(set->latch->modify); + bt_unlockpage(BtLockAtomic, set->latch); bt_unlockpage(BtLockWrite, set->latch); continue; } @@ -3246,28 +3302,12 @@ BtVal *val; // transfer slots in our selected page to larger btree if( !(set->latch->page_no % 100) ) -fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt); - - for( slot = 0; slot++ < set->page->cnt; ) { - ptr = keyptr (set->page, slot); - val = valptr (set->page, slot); - node = slotptr(set->page, slot); - - switch( node->type ) { - case Duplicate: - case Unique: - if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) ) - return bt->main->err; +fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act); - continue; - - case Delete: - if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) ) - return bt->main->err; + if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) ) + return bt->main->err; - continue; - } - } + bt_unlockpage(BtLockAtomic, set->latch); // now delete the page diff --git a/threadskv8.c b/threadskv8.c index 3f669b8..9d17a4a 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -112,15 +112,6 @@ typedef struct { ushort serving[1]; } RWLock; -// write only queue lock - -typedef struct { - ushort ticket[1]; - ushort serving[1]; - ushort tid; - ushort dup; -} WOLock; - #define PHID 0x1 #define PRES 0x2 #define MASK 0x3 @@ -143,6 +134,14 @@ volatile typedef struct { #define BOTH 3 #define SHARE 4 +// write only reentrant lock + +typedef struct { + BtSpinLatch xcl[1]; + volatile ushort tid[1]; + volatile ushort dup[1]; +} WOLock; + // hash table entries typedef struct { @@ -411,41 +410,39 @@ uid bt_newdup (BtDb *bt) #endif } -// Write-Only Queue Lock +void bt_spinreleasewrite(BtSpinLatch *latch); +void bt_spinwritelock(BtSpinLatch *latch); + +// Write-Only Reentrant Lock void WriteOLock (WOLock *lock, ushort tid) { -ushort tix; + while( 1 ) { + bt_spinwritelock(lock->xcl); - if( lock->tid == tid ) { - lock->dup++; + if( *lock->tid == tid ) { + *lock->dup += 1; + bt_spinreleasewrite(lock->xcl); return; } -#ifdef unix - tix = __sync_fetch_and_add (lock->ticket, 1); -#else - tix = _InterlockedExchangeAdd16 (lock->ticket, 1); -#endif - // wait for our ticket to come up - - while( tix != lock->serving[0] ) -#ifdef unix - sched_yield(); -#else - SwitchToThread (); -#endif - lock->tid = tid; + if( !*lock->tid ) { + *lock->tid = tid; + bt_spinreleasewrite(lock->xcl); + return; + } + bt_spinreleasewrite(lock->xcl); + sched_yield(); + } } void WriteORelease (WOLock *lock) { - if( lock->dup ) { - lock->dup--; + if( *lock->dup ) { + *lock->dup -= 1; return; } - lock->tid = 0; - lock->serving[0]++; + *lock->tid = 0; } // Phase-Fair reader/writer lock implementation @@ -3054,7 +3051,7 @@ uint slot = 0; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3087,9 +3084,9 @@ BtKey *ptr; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->parent, 0, sizeof(RWLock)); + memset ((ushort *)latch->parent, 0, sizeof(WOLock)); if( latch->pin ) { fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);