From: unknown Date: Fri, 17 Oct 2014 23:47:34 +0000 (-0700) Subject: Merge branch 'master' of https://github.com/malbrain/Btree-source-code X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=392e5f08cc164c87e56153aa78a740f93325750e;hp=680a12ba5215c2b00d2c7088289c871a95849c47 Merge branch 'master' of https://github.com/malbrain/Btree-source-code Conflicts: threadskv10b.c --- diff --git a/threadskv10.c b/threadskv10.c index 6ded2da..194757b 100644 --- a/threadskv10.c +++ b/threadskv10.c @@ -9,7 +9,7 @@ // redo log for failure recovery // and LSM B-trees for write optimization -// 11 OCT 2014 +// 15 OCT 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -113,8 +113,8 @@ typedef enum{ typedef struct { union { struct { - volatile ushort xlock; // one writer has exclusive lock - volatile ushort wrt; // count of other writers waiting + volatile ushort xlock[1]; // one writer has exclusive lock + volatile ushort wrt[1]; // count of other writers waiting } bits[1]; uint value[1]; }; @@ -132,12 +132,18 @@ typedef struct { volatile ushort serving[1]; } RWLock; -// write only lock +// write only reentrant lock typedef struct { - BtMutexLatch xcl[1]; - volatile ushort tid[1]; - volatile ushort dup[1]; + BtMutexLatch xcl[1]; + union { + struct { + volatile ushort tid[1]; + volatile ushort dup[1]; + } bits[1]; + uint value[1]; + }; + volatile uint waiters[1]; } WOLock; #define PHID 0x1 @@ -171,7 +177,8 @@ typedef struct { uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain ushort pin; // number of accessing threads - unsigned char dirty; // page in cache is dirty (atomic set) + unsigned char dirty; // page in cache is dirty (atomic setable) + unsigned char promote; // page in cache is dirty (atomic setable) BtMutexLatch modify[1]; // modify entry lite latch } BtLatchSet; @@ -327,15 +334,6 @@ typedef struct { uint reuse:1; // reused previous page } AtomicTxn; -typedef struct { - uid page_no; // page number for split leaf - void *next; // next key to insert - uint entry:29; // latch table entry number - uint type:2; // 0 == insert, 1 == delete, 2 == free - uint nounlock:1; // don't unlock ParentModification - unsigned char leafkey[BT_keyarray]; -} AtomicKey; - // Catastrophic errors typedef enum { @@ -487,14 +485,14 @@ uint slept = 0; while( 1 ) { *prev->value = __sync_fetch_and_or(latch->value, XCL); - if( !prev->bits->xlock ) { // did we set XCL? + if( !*prev->bits->xlock ) { // did we set XCL? if( slept ) __sync_fetch_and_sub(latch->value, WRT); return; } if( !slept ) { - prev->bits->wrt++; + *prev->bits->wrt += 1; __sync_fetch_and_add(latch->value, WRT); } @@ -516,7 +514,7 @@ BtMutexLatch prev[1]; // take write access if exclusive bit was clear - return !prev->bits->xlock; + return !*prev->bits->xlock; } // clear write mode @@ -527,40 +525,82 @@ BtMutexLatch prev[1]; *prev->value = __sync_fetch_and_and(latch->value, ~XCL); - if( prev->bits->wrt ) + if( *prev->bits->wrt ) sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr ); } -// Write-Only Queue Lock +// Write-Only Reentrant Lock void WriteOLock (WOLock *lock, ushort tid) { +uint prev, waited = 0; + while( 1 ) { bt_mutexlock(lock->xcl); - if( *lock->tid == tid ) { - *lock->dup += 1; + if( waited ) + *lock->waiters -= 1; + + if( *lock->bits->tid == tid ) { + *lock->bits->dup += 1; bt_releasemutex(lock->xcl); return; } - if( !*lock->tid ) { - *lock->tid = tid; + if( !*lock->bits->tid ) { + *lock->bits->tid = tid; bt_releasemutex(lock->xcl); return; } + + waited = 1; + *lock->waiters += 1; + prev = *lock->value; + bt_releasemutex(lock->xcl); - sched_yield(); + + sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr ); } } void WriteORelease (WOLock *lock) { - if( *lock->dup ) { - *lock->dup -= 1; - return; + bt_mutexlock(lock->xcl); + + if( *lock->bits->dup ) { + *lock->bits->dup -= 1; + bt_releasemutex(lock->xcl); + return; + } + + *lock->bits->tid = 0; + + if( *lock->waiters ) + sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr ); + bt_releasemutex(lock->xcl); +} + +// clear lock of holders and waiters + +ClearWOLock (WOLock *lock) +{ + while( 1 ) { + bt_mutexlock(lock->xcl); + + if( *lock->waiters ) { + bt_releasemutex(lock->xcl); + sched_yield(); + continue; + } + + if( *lock->bits->tid ) { + bt_releasemutex(lock->xcl); + sched_yield(); + continue; } - *lock->tid = 0; + bt_releasemutex(lock->xcl); + return; + } } // Phase-Fair reader/writer lock implementation @@ -1493,6 +1533,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no) WriteOLock (latch->atomic, thread_no); ReadLock (latch->readwr, thread_no); break; + case BtLockAtomic | BtLockWrite: + WriteOLock (latch->atomic, thread_no); + WriteLock (latch->readwr, thread_no); + break; } } @@ -1523,6 +1567,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) WriteORelease (latch->atomic); ReadRelease (latch->readwr); break; + case BtLockAtomic | BtLockWrite: + WriteORelease (latch->atomic); + WriteRelease (latch->readwr); + break; } } @@ -1539,7 +1587,7 @@ int blk; bt_mutexlock(mgr->lock); // use empty chain first - // else allocate empty page + // else allocate new page if( page_no = bt_getid(mgr->pagezero->freechain) ) { if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) ) @@ -1549,14 +1597,22 @@ int blk; mgr->pagezero->activepages++; bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); - if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) - fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); - bt_releasemutex(mgr->lock); + // the page is currently free and this + // will keep bt_txnpromote out. + + // contents will replace this bit + // and pin will keep bt_txnpromote out contents->page_no = page_no; - memcpy (set->page, contents, mgr->page_size); set->latch->dirty = 1; + + memcpy (set->page, contents, mgr->page_size); + + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + + bt_releasemutex(mgr->lock); return 0; } @@ -1571,6 +1627,9 @@ int blk; fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); + // keep bt_txnpromote out of this page + + contents->free = 1; contents->page_no = page_no; pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits); @@ -1581,6 +1640,9 @@ int blk; else return mgr->err; + // now pin will keep bt_txnpromote out + + set->page->free = 0; return 0; } @@ -1622,10 +1684,11 @@ uint good = 0; int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no) { -uid page_no = ROOT_page, prevpage = 0; +uid page_no = ROOT_page, prevpage_no = 0; uint drill = 0xff, slot; BtLatchSet *prevlatch; uint mode, prevmode; +BtPage prevpage; BtVal *val; // start at root of btree and drill down @@ -1643,13 +1706,15 @@ BtVal *val; bt_lockpage(BtLockAccess, set->latch, thread_no); set->page = bt_mappage (mgr, set->latch); +if( set->latch->promote ) +abort(); // release & unpin parent or left sibling page - if( prevpage ) { + if( prevpage_no ) { bt_unlockpage(prevmode, prevlatch); bt_unpinlatch (mgr, prevlatch); - prevpage = 0; + prevpage_no = 0; } // obtain mode lock using lock chaining through AccessLock @@ -1677,8 +1742,9 @@ BtVal *val; } } - prevpage = set->latch->page_no; + prevpage_no = set->latch->page_no; prevlatch = set->latch; + prevpage = set->page; prevmode = mode; // find key on page at this level @@ -1721,6 +1787,7 @@ BtVal *val; // return page to free list // page must be delete & write locked +// and have no keys pointing to it. void bt_freepage (BtMgr *mgr, BtPageSet *set) { @@ -1732,12 +1799,14 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set) memcpy(set->page->right, mgr->pagezero->freechain, BtId); bt_putid(mgr->pagezero->freechain, set->latch->page_no); + set->latch->promote = 0; set->latch->dirty = 1; set->page->free = 1; // decrement active page count mgr->pagezero->activepages--; + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); @@ -1845,7 +1914,7 @@ uint idx; // returns with page removed // from the page pool. -BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) +BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey) { unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray]; unsigned char value[BtId]; @@ -1854,7 +1923,6 @@ BtPageSet right[1]; uid page_no; BtKey *ptr; - // cache copy of fence key // to remove in parent @@ -1912,7 +1980,8 @@ BtKey *ptr; ptr = (BtKey*)lowerfence; - if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) + if( delkey ) + if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) return mgr->err; // obtain delete and write locks to right node @@ -1978,9 +2047,12 @@ BtVal *val; break; } + if( !found ) + return 0; + // did we delete a fence key in an upper level? - if( found && lvl && set->page->act && fence ) + if( lvl && set->page->act && fence ) if( bt_fixfence (mgr, set, lvl, thread_no) ) return mgr->err; else @@ -1988,7 +2060,7 @@ BtVal *val; // do we need to collapse root? - if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 ) + if( set->latch->page_no == ROOT_page && set->page->act == 1 ) if( bt_collapseroot (mgr, set, thread_no) ) return mgr->err; else @@ -1996,15 +2068,12 @@ BtVal *val; // delete empty page - if( !set->page->act ) { - if( bt_deletepage (mgr, set, thread_no) ) - return mgr->err; - } else { - set->latch->dirty = 1; - bt_unlockpage(BtLockWrite, set->latch); - bt_unpinlatch (mgr, set->latch); - } + if( !set->page->act ) + return bt_deletepage (mgr, set, thread_no, 1); + set->latch->dirty = 1; + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (mgr, set->latch); return 0; } @@ -2270,7 +2339,7 @@ BtVal *val; // split already locked full node // leave it locked. // return pool entry for new right -// page, unlocked +// page, pinned & unlocked uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { @@ -2385,8 +2454,8 @@ uint prev; } // fix keys for newly split page -// call with page locked, return -// unlocked +// call with both pages pinned & locked +// return unlocked and unpinned BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no) { @@ -2706,13 +2775,15 @@ uint entry, slot; return 0; } + // split page + if( entry = bt_splitpage (mgr, set, thread_no) ) latch = mgr->latchsets + entry; else return mgr->err; // splice right page into split chain - // and WriteLock it. + // and WriteLock it bt_lockpage(BtLockWrite, latch, thread_no); latch->split = set->latch->split; @@ -2763,99 +2834,15 @@ BtVal *val; return 0; } -// delete an empty master page for a transaction - -// note that the far right page never empties because -// it always contains (at least) the infinite stopper key -// and that all pages that don't contain any keys are -// deleted, or are being held under Atomic lock. - -BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no) +int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page) { -BtPageSet right[1], temp[1]; -unsigned char value[BtId]; -uid right_page_no; -BtKey *ptr; - - bt_lockpage(BtLockWrite, prev->latch, thread_no); - - // grab the right sibling - - if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) ) - right->page = bt_mappage (mgr, right->latch); - else - return mgr->err; - - bt_lockpage(BtLockAtomic, right->latch, thread_no); - bt_lockpage(BtLockWrite, right->latch, thread_no); - - // and pull contents over empty page - // while preserving master's left link - - memcpy (right->page->left, prev->page->left, BtId); - memcpy (prev->page, right->page, mgr->page_size); - - // forward seekers to old right sibling - // to new page location in set - - bt_putid (right->page->right, prev->latch->page_no); - right->latch->dirty = 1; - right->page->kill = 1; - - // remove pointer to right page for searchers by - // changing right fence key to point to the master page - - ptr = keyptr(right->page,right->page->cnt); - bt_putid (value, prev->latch->page_no); - - if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) - return mgr->err; - - // now that master page is in good shape we can - // remove its locks. - - bt_unlockpage (BtLockAtomic, prev->latch); - bt_unlockpage (BtLockWrite, prev->latch); - - // fix master's right sibling's left pointer - // to remove scanner's poiner to the right page - - if( right_page_no = bt_getid (prev->page->right) ) { - if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) ) - temp->page = bt_mappage (mgr, temp->latch); - else - return mgr->err; - - bt_lockpage (BtLockWrite, temp->latch, thread_no); - bt_putid (temp->page->left, prev->latch->page_no); - temp->latch->dirty = 1; - - bt_unlockpage (BtLockWrite, temp->latch); - bt_unpinlatch (mgr, temp->latch); - } else { // master is now the far right page - bt_mutexlock (mgr->lock); - bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no); - bt_releasemutex(mgr->lock); - } - - // now that there are no pointers to the right page - // we can delete it after the last read access occurs +BtKey *key1 = (BtKey *)((char *)page + slot1->off); +BtKey *key2 = (BtKey *)((char *)page + slot2->off); - bt_unlockpage (BtLockWrite, right->latch); - bt_unlockpage (BtLockAtomic, right->latch); - bt_lockpage (BtLockDelete, right->latch, thread_no); - bt_lockpage (BtLockWrite, right->latch, thread_no); - bt_freepage (mgr, right); - return 0; + return keycmp (key1, key2->key, key2->len); } - // atomic modification of a batch of keys. -// return -1 if BTERR is set -// otherwise return slot number -// causing the key constraint violation -// or zero on successful completion. - BTERR bt_atomictxn (BtDb *bt, BtPage source) { uint src, idx, slot, samepage, entry, que = 0; @@ -2867,7 +2854,7 @@ int type; // stable sort the list of keys into order to // prevent deadlocks between threads. - +/* for( src = 1; src++ < source->cnt; ) { *temp = *slotptr(source,src); key = keyptr (source,src); @@ -2881,7 +2868,8 @@ int type; break; } } - +*/ + qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source); // add entries to redo log if( bt->mgr->pagezero->redopages ) @@ -2911,7 +2899,6 @@ int type; BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no) { uint src, idx, slot, samepage, entry, que = 0; -AtomicKey *head, *tail, *leaf; BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtLatchSet *latch; @@ -2923,9 +2910,6 @@ uid right; locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); - head = NULL; - tail = NULL; - // Load the leaf page for each key // group same page references with reuse bit // and determine any constraint violations @@ -2941,11 +2925,9 @@ uid right; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); - else - bt_unlockpage(BtLockRead, set->latch); if( !slot ) - if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) ) + if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) ) set->latch->split = 0; else return mgr->err; @@ -2970,13 +2952,7 @@ uid right; locks[src].reqlsn = set->page->lsn; } - // unlock last loadpage lock - - if( source->cnt ) - bt_unlockpage(BtLockRead, set->latch); - // obtain write lock for each master page - // sync flushed pages to disk for( src = 0; src++ < source->cnt; ) { if( locks[src].reuse ) @@ -2988,11 +2964,6 @@ uid right; // insert or delete each key // process any splits or merges - // release Write & Atomic latches - // set ParentModifications and build - // queue of keys to insert for split pages - // or delete for deleted pages. - // run through txn list backwards samepage = source->cnt + 1; @@ -3031,70 +3002,62 @@ uid right; samepage = src; // pick-up all splits from master page - // each one is already WriteLocked. - - entry = prev->latch->split; + // each one is already pinned & WriteLocked. - while( entry ) { + if( entry = prev->latch->split ) do { set->latch = mgr->latchsets + entry; set->page = bt_mappage (mgr, set->latch); - entry = set->latch->split; // delete empty master page by undoing its split // (this is potentially another empty page) - // note that there are no new left pointers yet + // note that there are no pointers to it yet if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, mgr->page_size); bt_lockpage (BtLockDelete, set->latch, thread_no); - bt_freepage (mgr, set); - prev->latch->split = set->latch->split; prev->latch->dirty = 1; + bt_freepage (mgr, set); continue; } - // remove empty page from the split chain - // and return it to the free list. + // remove empty split page from the split chain + // and return it to the free list. No other + // thread has its page number yet. if( !set->page->act ) { memcpy (prev->page->right, set->page->right, BtId); prev->latch->split = set->latch->split; + bt_lockpage (BtLockDelete, set->latch, thread_no); bt_freepage (mgr, set); continue; } - // schedule prev fence key update + // update prev's fence key ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1), que++; - - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->entry = prev->latch - mgr->latchsets; - leaf->page_no = prev->latch->page_no; - leaf->type = 0; - - if( tail ) - tail->next = leaf; - else - head = leaf; + bt_putid (value, prev->latch->page_no); - tail = leaf; + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) + return mgr->err; // splice in the left link into the split page bt_putid (set->page->left, prev->latch->page_no); - bt_lockpage(BtLockParent, prev->latch, thread_no); - bt_unlockpage(BtLockWrite, prev->latch); + if( lsm ) bt_syncpage (mgr, prev->page, prev->latch); + + // page is unpinned below to avoid bt_txnpromote + + bt_unlockpage(BtLockWrite, prev->latch); *prev = *set; - } + } while( entry = prev->latch->split ); // update left pointer in next right page from last split page - // (if all splits were reversed, latch->split == 0) + // (if all splits were reversed or none occurred, latch->split == 0) if( latch->split ) { // fix left pointer in master's original (now split) @@ -3109,6 +3072,7 @@ uid right; bt_lockpage (BtLockWrite, set->latch, thread_no); bt_putid (set->page->left, prev->latch->page_no); set->latch->dirty = 1; + bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); } else { // prev is rightmost page @@ -3118,124 +3082,55 @@ uid right; } // Process last page split in chain + // by switching the key from the master + // page to the last split. ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1), que++; - - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->entry = prev->latch - mgr->latchsets; - leaf->page_no = prev->latch->page_no; - leaf->type = 0; - - if( tail ) - tail->next = leaf; - else - head = leaf; + bt_putid (value, prev->latch->page_no); - tail = leaf; + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) + return mgr->err; - bt_lockpage(BtLockParent, prev->latch, thread_no); bt_unlockpage(BtLockWrite, prev->latch); if( lsm ) bt_syncpage (mgr, prev->page, prev->latch); - // remove atomic lock on master page - bt_unlockpage(BtLockAtomic, latch); + bt_unpinlatch(mgr, latch); + + // go through the list of splits and + // release the latch pins + + while( entry = latch->split ) { + latch = mgr->latchsets + entry; + bt_unpinlatch(mgr, latch); + } + continue; } - // finished if prev page occupied (either master or final split) + // since there are no splits, we're + // finished if master page occupied if( prev->page->act ) { - bt_unlockpage(BtLockWrite, latch); - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(BtLockAtomic, prev->latch); + bt_unlockpage(BtLockWrite, prev->latch); if( lsm ) - bt_syncpage (mgr, prev->page, latch); + bt_syncpage (mgr, prev->page, prev->latch); - bt_unpinlatch(mgr, latch); + bt_unpinlatch(mgr, prev->latch); continue; } // any and all splits were reversed, and the // master page located in prev is empty, delete it - // by pulling over master's right sibling. - - // Remove empty master's fence key - - ptr = keyptr(prev->page,prev->page->cnt); - - if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) ) - return mgr->err; - - // perform the remainder of the delete - // from the FIFO queue - - leaf = calloc (sizeof(AtomicKey), 1), que++; - - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->entry = prev->latch - mgr->latchsets; - leaf->page_no = prev->latch->page_no; - leaf->nounlock = 1; - leaf->type = 2; - - if( tail ) - tail->next = leaf; - else - head = leaf; - - tail = leaf; - - // leave atomic lock in place until - // deletion completes in next phase. - - bt_unlockpage(BtLockWrite, prev->latch); - - if( lsm ) - bt_syncpage (mgr, prev->page, prev->latch); + if( bt_deletepage (mgr, prev, thread_no, 1) ) + return mgr->err; } - // add & delete keys for any pages split or merged during transaction - - if( leaf = head ) - do { - set->latch = mgr->latchsets + leaf->entry; - set->page = bt_mappage (mgr, set->latch); - - bt_putid (value, leaf->page_no); - ptr = (BtKey *)leaf->leafkey; - - switch( leaf->type ) { - case 0: // insert key - if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) - return mgr->err; - - break; - - case 1: // delete key - if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) ) - return mgr->err; - - break; - - case 2: // free page - if( bt_atomicfree (mgr, set, thread_no) ) - return mgr->err; - - break; - } - - if( !leaf->nounlock ) - bt_unlockpage (BtLockParent, set->latch); - - bt_unpinlatch (mgr, set->latch); - tail = leaf->next; - free (leaf); - } while( leaf = tail ); - free (locks); return 0; } @@ -3262,7 +3157,6 @@ BtVal *val; continue; set->latch = bt->mgr->latchsets + entry; - idx = set->latch->page_no % bt->mgr->latchhash; if( !bt_mutextry(set->latch->modify) ) continue; @@ -3283,38 +3177,48 @@ BtVal *val; continue; } - bt_lockpage (BtLockAtomic, set->latch, bt->thread_no); - bt_lockpage (BtLockWrite, set->latch, bt->thread_no); - - // entry interiour node or being or was killed + // entry interiour node or being killed or promoted if( set->page->lvl || set->page->free || set->page->kill ) { bt_releasemutex(set->latch->modify); - bt_unlockpage(BtLockAtomic, set->latch); - bt_unlockpage(BtLockWrite, set->latch); continue; } // pin the page for our useage set->latch->pin++; + set->latch->promote = 1; bt_releasemutex(set->latch->modify); + bt_lockpage (BtLockWrite, set->latch, bt->thread_no); + + // remove the key for the page + // and wait for other threads to fade away + + ptr = keyptr(set->page, set->page->cnt); + + if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) + return bt->mgr->err; + + bt_unlockpage (BtLockWrite, set->latch); +while( (set->latch->pin & ~CLOCK_bit) > 1 ) +sched_yield(); + bt_lockpage (BtLockDelete, set->latch, bt->thread_no); + bt_lockpage (BtLockAtomic, set->latch, bt->thread_no); + bt_lockpage (BtLockWrite, set->latch, bt->thread_no); + // transfer slots in our selected page to larger btree if( !(set->latch->page_no % 100) ) fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act); - if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) ) + if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) return bt->main->err; - bt_unlockpage(BtLockAtomic, set->latch); - // now delete the page - if( bt_deletepage (bt->mgr, set, bt->thread_no) ) - return bt->mgr->err; - - return 0; + bt_unlockpage (BtLockDelete, set->latch); + bt_unlockpage (BtLockAtomic, set->latch); + return bt_deletepage (bt->mgr, set, bt->thread_no, 0); } } diff --git a/threadskv10b.c b/threadskv10b.c index 825b5f5..09a657e 100644 --- a/threadskv10b.c +++ b/threadskv10b.c @@ -2,7 +2,6 @@ // with reworked bt_deletekey code, // phase-fair re-entrant reader writer locks, // librarian page split code, -// duplicate key management // bi-directional cursors // traditional buffer pool manager // ACID batched key-value updates