From: unknown Date: Fri, 17 Oct 2014 23:47:34 +0000 (-0700) Subject: Merge branch 'master' of https://github.com/malbrain/Btree-source-code X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=392e5f08cc164c87e56153aa78a740f93325750e Merge branch 'master' of https://github.com/malbrain/Btree-source-code Conflicts: threadskv10b.c --- 392e5f08cc164c87e56153aa78a740f93325750e diff --cc threadskv10.c index 6ded2da,6ded2da..194757b --- a/threadskv10.c +++ b/threadskv10.c @@@ -9,7 -9,7 +9,7 @@@ // redo log for failure recovery // and LSM B-trees for write optimization --// 11 OCT 2014 ++// 15 OCT 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@@ -113,8 -113,8 +113,8 @@@ typedef enum typedef struct { union { struct { -- volatile ushort xlock; // one writer has exclusive lock -- volatile ushort wrt; // count of other writers waiting ++ volatile ushort xlock[1]; // one writer has exclusive lock ++ volatile ushort wrt[1]; // count of other writers waiting } bits[1]; uint value[1]; }; @@@ -132,12 -132,12 +132,18 @@@ typedef struct volatile ushort serving[1]; } RWLock; --// write only lock ++// write only reentrant lock typedef struct { -- BtMutexLatch xcl[1]; -- volatile ushort tid[1]; -- volatile ushort dup[1]; ++ BtMutexLatch xcl[1]; ++ union { ++ struct { ++ volatile ushort tid[1]; ++ volatile ushort dup[1]; ++ } bits[1]; ++ uint value[1]; ++ }; ++ volatile uint waiters[1]; } WOLock; #define PHID 0x1 @@@ -171,7 -171,7 +177,8 @@@ typedef struct uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain ushort pin; // number of accessing threads -- unsigned char dirty; // page in cache is dirty (atomic set) ++ unsigned char dirty; // page in cache is dirty (atomic setable) ++ unsigned char promote; // page in cache is dirty (atomic setable) BtMutexLatch modify[1]; // modify entry lite latch } BtLatchSet; @@@ -327,15 -327,15 +334,6 @@@ typedef struct uint reuse:1; // reused previous page } AtomicTxn; --typedef struct { -- uid page_no; // page number for split leaf -- void *next; // next key to insert -- uint entry:29; // latch table entry number -- uint type:2; // 0 == insert, 1 == delete, 2 == free -- uint nounlock:1; // don't unlock ParentModification -- unsigned char leafkey[BT_keyarray]; --} AtomicKey; -- // Catastrophic errors typedef enum { @@@ -487,14 -487,14 +485,14 @@@ uint slept = 0 while( 1 ) { *prev->value = __sync_fetch_and_or(latch->value, XCL); -- if( !prev->bits->xlock ) { // did we set XCL? ++ if( !*prev->bits->xlock ) { // did we set XCL? if( slept ) __sync_fetch_and_sub(latch->value, WRT); return; } if( !slept ) { -- prev->bits->wrt++; ++ *prev->bits->wrt += 1; __sync_fetch_and_add(latch->value, WRT); } @@@ -516,7 -516,7 +514,7 @@@ BtMutexLatch prev[1] // take write access if exclusive bit was clear -- return !prev->bits->xlock; ++ return !*prev->bits->xlock; } // clear write mode @@@ -527,40 -527,40 +525,82 @@@ BtMutexLatch prev[1] *prev->value = __sync_fetch_and_and(latch->value, ~XCL); -- if( prev->bits->wrt ) ++ if( *prev->bits->wrt ) sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr ); } --// Write-Only Queue Lock ++// Write-Only Reentrant Lock void WriteOLock (WOLock *lock, ushort tid) { ++uint prev, waited = 0; ++ while( 1 ) { bt_mutexlock(lock->xcl); -- if( *lock->tid == tid ) { -- *lock->dup += 1; ++ if( waited ) ++ *lock->waiters -= 1; ++ ++ if( *lock->bits->tid == tid ) { ++ *lock->bits->dup += 1; bt_releasemutex(lock->xcl); return; } -- if( !*lock->tid ) { -- *lock->tid = tid; ++ if( !*lock->bits->tid ) { ++ *lock->bits->tid = tid; bt_releasemutex(lock->xcl); return; } ++ ++ waited = 1; ++ *lock->waiters += 1; ++ prev = *lock->value; ++ bt_releasemutex(lock->xcl); -- sched_yield(); ++ ++ sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr ); } } void WriteORelease (WOLock *lock) { -- if( *lock->dup ) { -- *lock->dup -= 1; -- return; ++ bt_mutexlock(lock->xcl); ++ ++ if( *lock->bits->dup ) { ++ *lock->bits->dup -= 1; ++ bt_releasemutex(lock->xcl); ++ return; ++ } ++ ++ *lock->bits->tid = 0; ++ ++ if( *lock->waiters ) ++ sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr ); ++ bt_releasemutex(lock->xcl); ++} ++ ++// clear lock of holders and waiters ++ ++ClearWOLock (WOLock *lock) ++{ ++ while( 1 ) { ++ bt_mutexlock(lock->xcl); ++ ++ if( *lock->waiters ) { ++ bt_releasemutex(lock->xcl); ++ sched_yield(); ++ continue; ++ } ++ ++ if( *lock->bits->tid ) { ++ bt_releasemutex(lock->xcl); ++ sched_yield(); ++ continue; } -- *lock->tid = 0; ++ bt_releasemutex(lock->xcl); ++ return; ++ } } // Phase-Fair reader/writer lock implementation @@@ -1493,6 -1493,6 +1533,10 @@@ void bt_lockpage(BtLock mode, BtLatchSe WriteOLock (latch->atomic, thread_no); ReadLock (latch->readwr, thread_no); break; ++ case BtLockAtomic | BtLockWrite: ++ WriteOLock (latch->atomic, thread_no); ++ WriteLock (latch->readwr, thread_no); ++ break; } } @@@ -1523,6 -1523,6 +1567,10 @@@ void bt_unlockpage(BtLock mode, BtLatch WriteORelease (latch->atomic); ReadRelease (latch->readwr); break; ++ case BtLockAtomic | BtLockWrite: ++ WriteORelease (latch->atomic); ++ WriteRelease (latch->readwr); ++ break; } } @@@ -1539,7 -1539,7 +1587,7 @@@ int blk bt_mutexlock(mgr->lock); // use empty chain first -- // else allocate empty page ++ // else allocate new page if( page_no = bt_getid(mgr->pagezero->freechain) ) { if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) ) @@@ -1549,14 -1549,14 +1597,22 @@@ mgr->pagezero->activepages++; bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); -- if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) -- fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); -- bt_releasemutex(mgr->lock); ++ // the page is currently free and this ++ // will keep bt_txnpromote out. ++ ++ // contents will replace this bit ++ // and pin will keep bt_txnpromote out contents->page_no = page_no; -- memcpy (set->page, contents, mgr->page_size); set->latch->dirty = 1; ++ ++ memcpy (set->page, contents, mgr->page_size); ++ ++ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) ++ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); ++ ++ bt_releasemutex(mgr->lock); return 0; } @@@ -1571,6 -1571,6 +1627,9 @@@ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); ++ // keep bt_txnpromote out of this page ++ ++ contents->free = 1; contents->page_no = page_no; pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits); @@@ -1581,6 -1581,6 +1640,9 @@@ else return mgr->err; ++ // now pin will keep bt_txnpromote out ++ ++ set->page->free = 0; return 0; } @@@ -1622,10 -1622,10 +1684,11 @@@ uint good = 0 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no) { --uid page_no = ROOT_page, prevpage = 0; ++uid page_no = ROOT_page, prevpage_no = 0; uint drill = 0xff, slot; BtLatchSet *prevlatch; uint mode, prevmode; ++BtPage prevpage; BtVal *val; // start at root of btree and drill down @@@ -1643,13 -1643,13 +1706,15 @@@ bt_lockpage(BtLockAccess, set->latch, thread_no); set->page = bt_mappage (mgr, set->latch); ++if( set->latch->promote ) ++abort(); // release & unpin parent or left sibling page -- if( prevpage ) { ++ if( prevpage_no ) { bt_unlockpage(prevmode, prevlatch); bt_unpinlatch (mgr, prevlatch); -- prevpage = 0; ++ prevpage_no = 0; } // obtain mode lock using lock chaining through AccessLock @@@ -1677,8 -1677,8 +1742,9 @@@ } } -- prevpage = set->latch->page_no; ++ prevpage_no = set->latch->page_no; prevlatch = set->latch; ++ prevpage = set->page; prevmode = mode; // find key on page at this level @@@ -1721,6 -1721,6 +1787,7 @@@ // return page to free list // page must be delete & write locked ++// and have no keys pointing to it. void bt_freepage (BtMgr *mgr, BtPageSet *set) { @@@ -1732,12 -1732,12 +1799,14 @@@ memcpy(set->page->right, mgr->pagezero->freechain, BtId); bt_putid(mgr->pagezero->freechain, set->latch->page_no); ++ set->latch->promote = 0; set->latch->dirty = 1; set->page->free = 1; // decrement active page count mgr->pagezero->activepages--; ++ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); @@@ -1845,7 -1845,7 +1914,7 @@@ uint idx // returns with page removed // from the page pool. --BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) ++BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey) { unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray]; unsigned char value[BtId]; @@@ -1854,7 -1854,7 +1923,6 @@@ BtPageSet right[1] uid page_no; BtKey *ptr; -- // cache copy of fence key // to remove in parent @@@ -1912,7 -1912,7 +1980,8 @@@ ptr = (BtKey*)lowerfence; -- if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) ++ if( delkey ) ++ if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) return mgr->err; // obtain delete and write locks to right node @@@ -1978,9 -1978,9 +2047,12 @@@ BtVal *val break; } ++ if( !found ) ++ return 0; ++ // did we delete a fence key in an upper level? -- if( found && lvl && set->page->act && fence ) ++ if( lvl && set->page->act && fence ) if( bt_fixfence (mgr, set, lvl, thread_no) ) return mgr->err; else @@@ -1988,7 -1988,7 +2060,7 @@@ // do we need to collapse root? -- if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 ) ++ if( set->latch->page_no == ROOT_page && set->page->act == 1 ) if( bt_collapseroot (mgr, set, thread_no) ) return mgr->err; else @@@ -1996,15 -1996,15 +2068,12 @@@ // delete empty page -- if( !set->page->act ) { -- if( bt_deletepage (mgr, set, thread_no) ) -- return mgr->err; -- } else { -- set->latch->dirty = 1; -- bt_unlockpage(BtLockWrite, set->latch); -- bt_unpinlatch (mgr, set->latch); -- } ++ if( !set->page->act ) ++ return bt_deletepage (mgr, set, thread_no, 1); ++ set->latch->dirty = 1; ++ bt_unlockpage(BtLockWrite, set->latch); ++ bt_unpinlatch (mgr, set->latch); return 0; } @@@ -2270,7 -2270,7 +2339,7 @@@ BtVal *val // split already locked full node // leave it locked. // return pool entry for new right --// page, unlocked ++// page, pinned & unlocked uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { @@@ -2385,8 -2385,8 +2454,8 @@@ uint prev } // fix keys for newly split page --// call with page locked, return --// unlocked ++// call with both pages pinned & locked ++// return unlocked and unpinned BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no) { @@@ -2706,13 -2706,13 +2775,15 @@@ uint entry, slot return 0; } ++ // split page ++ if( entry = bt_splitpage (mgr, set, thread_no) ) latch = mgr->latchsets + entry; else return mgr->err; // splice right page into split chain -- // and WriteLock it. ++ // and WriteLock it bt_lockpage(BtLockWrite, latch, thread_no); latch->split = set->latch->split; @@@ -2763,99 -2763,99 +2834,15 @@@ BtVal *val return 0; } --// delete an empty master page for a transaction -- --// note that the far right page never empties because --// it always contains (at least) the infinite stopper key --// and that all pages that don't contain any keys are --// deleted, or are being held under Atomic lock. -- --BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no) ++int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page) { --BtPageSet right[1], temp[1]; --unsigned char value[BtId]; --uid right_page_no; --BtKey *ptr; -- -- bt_lockpage(BtLockWrite, prev->latch, thread_no); -- -- // grab the right sibling -- -- if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) ) -- right->page = bt_mappage (mgr, right->latch); -- else -- return mgr->err; -- -- bt_lockpage(BtLockAtomic, right->latch, thread_no); -- bt_lockpage(BtLockWrite, right->latch, thread_no); -- -- // and pull contents over empty page -- // while preserving master's left link -- -- memcpy (right->page->left, prev->page->left, BtId); -- memcpy (prev->page, right->page, mgr->page_size); -- -- // forward seekers to old right sibling -- // to new page location in set -- -- bt_putid (right->page->right, prev->latch->page_no); -- right->latch->dirty = 1; -- right->page->kill = 1; -- -- // remove pointer to right page for searchers by -- // changing right fence key to point to the master page -- -- ptr = keyptr(right->page,right->page->cnt); -- bt_putid (value, prev->latch->page_no); -- -- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) -- return mgr->err; -- -- // now that master page is in good shape we can -- // remove its locks. -- -- bt_unlockpage (BtLockAtomic, prev->latch); -- bt_unlockpage (BtLockWrite, prev->latch); -- -- // fix master's right sibling's left pointer -- // to remove scanner's poiner to the right page -- -- if( right_page_no = bt_getid (prev->page->right) ) { -- if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) ) -- temp->page = bt_mappage (mgr, temp->latch); -- else -- return mgr->err; -- -- bt_lockpage (BtLockWrite, temp->latch, thread_no); -- bt_putid (temp->page->left, prev->latch->page_no); -- temp->latch->dirty = 1; -- -- bt_unlockpage (BtLockWrite, temp->latch); -- bt_unpinlatch (mgr, temp->latch); -- } else { // master is now the far right page -- bt_mutexlock (mgr->lock); -- bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no); -- bt_releasemutex(mgr->lock); -- } ++BtKey *key1 = (BtKey *)((char *)page + slot1->off); ++BtKey *key2 = (BtKey *)((char *)page + slot2->off); -- // now that there are no pointers to the right page -- // we can delete it after the last read access occurs -- -- bt_unlockpage (BtLockWrite, right->latch); -- bt_unlockpage (BtLockAtomic, right->latch); -- bt_lockpage (BtLockDelete, right->latch, thread_no); -- bt_lockpage (BtLockWrite, right->latch, thread_no); -- bt_freepage (mgr, right); -- return 0; ++ return keycmp (key1, key2->key, key2->len); } -- // atomic modification of a batch of keys. --// return -1 if BTERR is set --// otherwise return slot number --// causing the key constraint violation --// or zero on successful completion. -- BTERR bt_atomictxn (BtDb *bt, BtPage source) { uint src, idx, slot, samepage, entry, que = 0; @@@ -2867,7 -2867,7 +2854,7 @@@ int type // stable sort the list of keys into order to // prevent deadlocks between threads. -- ++/* for( src = 1; src++ < source->cnt; ) { *temp = *slotptr(source,src); key = keyptr (source,src); @@@ -2881,7 -2881,7 +2868,8 @@@ break; } } -- ++*/ ++ qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source); // add entries to redo log if( bt->mgr->pagezero->redopages ) @@@ -2911,7 -2911,7 +2899,6 @@@ BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no) { uint src, idx, slot, samepage, entry, que = 0; --AtomicKey *head, *tail, *leaf; BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtLatchSet *latch; @@@ -2923,9 -2923,9 +2910,6 @@@ uid right locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); -- head = NULL; -- tail = NULL; -- // Load the leaf page for each key // group same page references with reuse bit // and determine any constraint violations @@@ -2941,11 -2941,11 +2925,9 @@@ if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); -- else -- bt_unlockpage(BtLockRead, set->latch); if( !slot ) -- if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) ) ++ if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) ) set->latch->split = 0; else return mgr->err; @@@ -2970,13 -2970,13 +2952,7 @@@ locks[src].reqlsn = set->page->lsn; } -- // unlock last loadpage lock -- -- if( source->cnt ) -- bt_unlockpage(BtLockRead, set->latch); -- // obtain write lock for each master page -- // sync flushed pages to disk for( src = 0; src++ < source->cnt; ) { if( locks[src].reuse ) @@@ -2988,11 -2988,11 +2964,6 @@@ // insert or delete each key // process any splits or merges -- // release Write & Atomic latches -- // set ParentModifications and build -- // queue of keys to insert for split pages -- // or delete for deleted pages. -- // run through txn list backwards samepage = source->cnt + 1; @@@ -3031,70 -3031,70 +3002,62 @@@ samepage = src; // pick-up all splits from master page -- // each one is already WriteLocked. -- -- entry = prev->latch->split; ++ // each one is already pinned & WriteLocked. -- while( entry ) { ++ if( entry = prev->latch->split ) do { set->latch = mgr->latchsets + entry; set->page = bt_mappage (mgr, set->latch); -- entry = set->latch->split; // delete empty master page by undoing its split // (this is potentially another empty page) -- // note that there are no new left pointers yet ++ // note that there are no pointers to it yet if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, mgr->page_size); bt_lockpage (BtLockDelete, set->latch, thread_no); -- bt_freepage (mgr, set); -- prev->latch->split = set->latch->split; prev->latch->dirty = 1; ++ bt_freepage (mgr, set); continue; } -- // remove empty page from the split chain -- // and return it to the free list. ++ // remove empty split page from the split chain ++ // and return it to the free list. No other ++ // thread has its page number yet. if( !set->page->act ) { memcpy (prev->page->right, set->page->right, BtId); prev->latch->split = set->latch->split; ++ bt_lockpage (BtLockDelete, set->latch, thread_no); bt_freepage (mgr, set); continue; } -- // schedule prev fence key update ++ // update prev's fence key ptr = keyptr(prev->page,prev->page->cnt); -- leaf = calloc (sizeof(AtomicKey), 1), que++; -- -- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); -- leaf->entry = prev->latch - mgr->latchsets; -- leaf->page_no = prev->latch->page_no; -- leaf->type = 0; -- -- if( tail ) -- tail->next = leaf; -- else -- head = leaf; ++ bt_putid (value, prev->latch->page_no); -- tail = leaf; ++ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) ++ return mgr->err; // splice in the left link into the split page bt_putid (set->page->left, prev->latch->page_no); -- bt_lockpage(BtLockParent, prev->latch, thread_no); -- bt_unlockpage(BtLockWrite, prev->latch); ++ if( lsm ) bt_syncpage (mgr, prev->page, prev->latch); ++ ++ // page is unpinned below to avoid bt_txnpromote ++ ++ bt_unlockpage(BtLockWrite, prev->latch); *prev = *set; -- } ++ } while( entry = prev->latch->split ); // update left pointer in next right page from last split page -- // (if all splits were reversed, latch->split == 0) ++ // (if all splits were reversed or none occurred, latch->split == 0) if( latch->split ) { // fix left pointer in master's original (now split) @@@ -3109,6 -3109,6 +3072,7 @@@ bt_lockpage (BtLockWrite, set->latch, thread_no); bt_putid (set->page->left, prev->latch->page_no); set->latch->dirty = 1; ++ bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); } else { // prev is rightmost page @@@ -3118,124 -3118,124 +3082,55 @@@ } // Process last page split in chain ++ // by switching the key from the master ++ // page to the last split. ptr = keyptr(prev->page,prev->page->cnt); -- leaf = calloc (sizeof(AtomicKey), 1), que++; -- -- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); -- leaf->entry = prev->latch - mgr->latchsets; -- leaf->page_no = prev->latch->page_no; -- leaf->type = 0; -- -- if( tail ) -- tail->next = leaf; -- else -- head = leaf; ++ bt_putid (value, prev->latch->page_no); -- tail = leaf; ++ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) ++ return mgr->err; -- bt_lockpage(BtLockParent, prev->latch, thread_no); bt_unlockpage(BtLockWrite, prev->latch); if( lsm ) bt_syncpage (mgr, prev->page, prev->latch); -- // remove atomic lock on master page -- bt_unlockpage(BtLockAtomic, latch); ++ bt_unpinlatch(mgr, latch); ++ ++ // go through the list of splits and ++ // release the latch pins ++ ++ while( entry = latch->split ) { ++ latch = mgr->latchsets + entry; ++ bt_unpinlatch(mgr, latch); ++ } ++ continue; } -- // finished if prev page occupied (either master or final split) ++ // since there are no splits, we're ++ // finished if master page occupied if( prev->page->act ) { -- bt_unlockpage(BtLockWrite, latch); -- bt_unlockpage(BtLockAtomic, latch); ++ bt_unlockpage(BtLockAtomic, prev->latch); ++ bt_unlockpage(BtLockWrite, prev->latch); if( lsm ) -- bt_syncpage (mgr, prev->page, latch); ++ bt_syncpage (mgr, prev->page, prev->latch); -- bt_unpinlatch(mgr, latch); ++ bt_unpinlatch(mgr, prev->latch); continue; } // any and all splits were reversed, and the // master page located in prev is empty, delete it -- // by pulling over master's right sibling. -- -- // Remove empty master's fence key -- -- ptr = keyptr(prev->page,prev->page->cnt); -- -- if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) ) -- return mgr->err; -- -- // perform the remainder of the delete -- // from the FIFO queue -- -- leaf = calloc (sizeof(AtomicKey), 1), que++; -- -- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); -- leaf->entry = prev->latch - mgr->latchsets; -- leaf->page_no = prev->latch->page_no; -- leaf->nounlock = 1; -- leaf->type = 2; -- -- if( tail ) -- tail->next = leaf; -- else -- head = leaf; -- -- tail = leaf; -- -- // leave atomic lock in place until -- // deletion completes in next phase. -- -- bt_unlockpage(BtLockWrite, prev->latch); -- -- if( lsm ) -- bt_syncpage (mgr, prev->page, prev->latch); ++ if( bt_deletepage (mgr, prev, thread_no, 1) ) ++ return mgr->err; } -- // add & delete keys for any pages split or merged during transaction -- -- if( leaf = head ) -- do { -- set->latch = mgr->latchsets + leaf->entry; -- set->page = bt_mappage (mgr, set->latch); -- -- bt_putid (value, leaf->page_no); -- ptr = (BtKey *)leaf->leafkey; -- -- switch( leaf->type ) { -- case 0: // insert key -- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) -- return mgr->err; -- -- break; -- -- case 1: // delete key -- if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) ) -- return mgr->err; -- -- break; -- -- case 2: // free page -- if( bt_atomicfree (mgr, set, thread_no) ) -- return mgr->err; -- -- break; -- } -- -- if( !leaf->nounlock ) -- bt_unlockpage (BtLockParent, set->latch); -- -- bt_unpinlatch (mgr, set->latch); -- tail = leaf->next; -- free (leaf); -- } while( leaf = tail ); -- free (locks); return 0; } @@@ -3262,7 -3262,7 +3157,6 @@@ BtVal *val continue; set->latch = bt->mgr->latchsets + entry; -- idx = set->latch->page_no % bt->mgr->latchhash; if( !bt_mutextry(set->latch->modify) ) continue; @@@ -3283,38 -3283,38 +3177,48 @@@ continue; } -- bt_lockpage (BtLockAtomic, set->latch, bt->thread_no); -- bt_lockpage (BtLockWrite, set->latch, bt->thread_no); -- -- // entry interiour node or being or was killed ++ // entry interiour node or being killed or promoted if( set->page->lvl || set->page->free || set->page->kill ) { bt_releasemutex(set->latch->modify); -- bt_unlockpage(BtLockAtomic, set->latch); -- bt_unlockpage(BtLockWrite, set->latch); continue; } // pin the page for our useage set->latch->pin++; ++ set->latch->promote = 1; bt_releasemutex(set->latch->modify); ++ bt_lockpage (BtLockWrite, set->latch, bt->thread_no); ++ ++ // remove the key for the page ++ // and wait for other threads to fade away ++ ++ ptr = keyptr(set->page, set->page->cnt); ++ ++ if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) ++ return bt->mgr->err; ++ ++ bt_unlockpage (BtLockWrite, set->latch); ++while( (set->latch->pin & ~CLOCK_bit) > 1 ) ++sched_yield(); ++ bt_lockpage (BtLockDelete, set->latch, bt->thread_no); ++ bt_lockpage (BtLockAtomic, set->latch, bt->thread_no); ++ bt_lockpage (BtLockWrite, set->latch, bt->thread_no); ++ // transfer slots in our selected page to larger btree if( !(set->latch->page_no % 100) ) fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act); -- if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) ) ++ if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) return bt->main->err; -- bt_unlockpage(BtLockAtomic, set->latch); -- // now delete the page -- if( bt_deletepage (bt->mgr, set, bt->thread_no) ) -- return bt->mgr->err; -- -- return 0; ++ bt_unlockpage (BtLockDelete, set->latch); ++ bt_unlockpage (BtLockAtomic, set->latch); ++ return bt_deletepage (bt->mgr, set, bt->thread_no, 0); } } diff --cc threadskv10b.c index 825b5f5,fa68f22..09a657e --- a/threadskv10b.c +++ b/threadskv10b.c @@@ -2,7 -2,7 +2,6 @@@ // with reworked bt_deletekey code, // phase-fair re-entrant reader writer locks, // librarian page split code, --// duplicate key management // bi-directional cursors // traditional buffer pool manager // ACID batched key-value updates