From: unknown Date: Tue, 28 Oct 2014 02:35:20 +0000 (-0700) Subject: Replace mutex implementation, and fix a few bugs. X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=3a9df57c6b4db2f7d279b228de02e02a2ef9a04e Replace mutex implementation, and fix a few bugs. --- diff --git a/threadskv10e.c b/threadskv10e.c index 6228512..fa5fdfb 100644 --- a/threadskv10e.c +++ b/threadskv10e.c @@ -112,14 +112,21 @@ typedef enum{ } BtLock; typedef struct { - volatile uint value[1]; -} BtMutexLatch; + union { + struct { + volatile unsigned char xcl[1]; + volatile unsigned char filler; + volatile ushort waiters[1]; + } bits[1]; + uint value[1]; + }; +} MutexLatch; // definition for reader/writer reentrant lock implementation typedef struct { - BtMutexLatch xcl[1]; - BtMutexLatch wrt[1]; + MutexLatch xcl[1]; + MutexLatch wrt[1]; uint readers; ushort dup; // re-entrant locks ushort tid; // owner thread-no @@ -129,7 +136,7 @@ typedef struct { // hash table entries typedef struct { - BtMutexLatch latch[1]; + MutexLatch latch[1]; uint entry; // Latch table entry at head of chain } BtHashEntry; @@ -137,7 +144,7 @@ typedef struct { typedef struct { uid page_no; // latch set page number - BtMutexLatch modify[1]; // modify entry lite latch + MutexLatch modify[1]; // modify entry lite latch RWLock readwr[1]; // read/write page lock RWLock access[1]; // Access Intent/Page delete RWLock parent[1]; // Posting of fence key in parent @@ -145,7 +152,7 @@ typedef struct { uint split; // right split page atomic insert uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain - ushort pin; // number of accessing threads + volatile ushort pin; // number of accessing threads unsigned char dirty; // page in cache is dirty unsigned char leaf; // page in cache is a leaf } BtLatchSet; @@ -175,6 +182,7 @@ typedef struct { typedef enum { Unique, + Update, Librarian, Duplicate, Delete @@ -266,9 +274,9 @@ typedef struct { unsigned char *leafpool; // mapped to the leaf pool pages unsigned char *redobuff; // mapped recovery buffer pointer logseqno lsn, flushlsn; // current & first lsn flushed - BtMutexLatch redo[1]; // redo area lite latch - BtMutexLatch lock[1]; // allocation area lite latch - BtMutexLatch maps[1]; // mapping segments lite latch + MutexLatch redo[1]; // redo area lite latch + MutexLatch lock[1]; // allocation area lite latch + MutexLatch maps[1]; // mapping segments lite latch ushort thread_no[1]; // highest thread number issued ushort err_thread; // error thread number uint nlatchpage; // size of buffer pool & latchsets @@ -291,7 +299,7 @@ typedef struct { HANDLE halloc; // allocation handle HANDLE hpool; // buffer pool handle #endif - uint segments; // number of memory mapped segments + volatile uint segments; // number of memory mapped segments unsigned char *pages[64000];// memory mapped segments of b-tree } BtMgr; @@ -453,65 +461,43 @@ int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *add { return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); } -/* -void bt_mutexlock(BtMutexLatch *latch) -{ -uint idx = 200; - - while( __sync_val_compare_and_swap (latch->value, 0, 1) ) - if( !idx ) - sched_yield (); - else - idx--; -} -int bt_mutextry(BtMutexLatch *latch) -{ - return !__sync_val_compare_and_swap (latch->value, 0, 1); -} - -void bt_releasemutex(BtMutexLatch *latch) -{ - if( !__sync_lock_test_and_set (latch->value, 0) ) - abort(); -} -*/ -void bt_mutexlock(BtMutexLatch *latch) +void bt_mutexlock(MutexLatch *latch) { -uint idx; +uint idx, waited = 0; +MutexLatch prev[1]; - for( idx = 0; idx < 100; idx++ ) - if( !__sync_val_compare_and_swap (latch->value, 0, 1) ) + while( 1 ) { + for( idx = 0; idx < 100; idx++ ) { + *prev->value = __sync_fetch_and_or (latch->value, 1); + if( !*prev->bits->xcl ) { + if( waited ) + __sync_fetch_and_sub (latch->bits->waiters, 1); return; + } + } - while( __sync_lock_test_and_set (latch->value, 2) ) - sys_futex ((uint *)latch->value, FUTEX_WAIT_PRIVATE, 2, NULL, NULL, 0); + if( !waited ) { + __sync_fetch_and_add (latch->bits->waiters, 1); + *prev->bits->waiters += 1; + waited++; + } + + sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0); + } } -int bt_mutextry(BtMutexLatch *latch) +int bt_mutextry(MutexLatch *latch) { - return !__sync_val_compare_and_swap (latch->value, 0, 1); + return !__sync_lock_test_and_set (latch->bits->xcl, 1); } -void bt_releasemutex(BtMutexLatch *latch) +void bt_releasemutex(MutexLatch *latch) { -uint idx; - - if( *latch->value == 2 ) - *latch->value = 0; - else if( __sync_lock_test_and_set (latch->value, 0) == 1 ) - return; + *latch->bits->xcl = 0; - if( latch->value[0] ) - if( __sync_val_compare_and_swap (latch->value, 1, 2) ) - return; - -// for( idx = 0; idx < 200; idx++ ) -// if( latch->value[0] ) -// if( __sync_val_compare_and_swap (latch->value, 1, 2) ) -// return; - - sys_futex( (uint *)latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 ); + if( *latch->bits->waiters ) + sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 ); } // reader/writer lock implementation @@ -525,8 +511,7 @@ void WriteLock (RWLock *lock, ushort tid, uint line) bt_mutexlock (lock->xcl); bt_mutexlock (lock->wrt); bt_releasemutex (lock->xcl); -//if( lock->tid ) -//abort(); + lock->line = line; lock->tid = tid; } @@ -839,7 +824,7 @@ unsigned char *perm; segment++, fragment = 0; if( segment < mgr->segments ) { - perm = mgr->pages[segment] + (fragment << mgr->page_bits); + perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits); memcpy ((unsigned char *)page + off, perm, mgr->page_size); off += mgr->page_size; @@ -860,10 +845,6 @@ unsigned char *perm; bt_releasemutex (mgr->maps); } -//if( !leaf && !page->lvl ) -//abort(); -//if( leaf && page->lvl ) -//abort(); return 0; } @@ -876,12 +857,6 @@ uint page_size = mgr->page_size; uint off = 0, segment, fragment; unsigned char *perm; -//if( !leaf && !page->lvl ) -//abort(); -//if( leaf && page->lvl ) -//abort(); -//if( !page->lvl && mgr->leaf_xtra == 8 ) -//fprintf(stderr, "WrPage %d\n", (uint)page_no); if( leaf ) page_size <<= mgr->leaf_xtra; @@ -894,7 +869,7 @@ unsigned char *perm; segment++, fragment = 0; if( segment < mgr->segments ) { - perm = mgr->pages[segment] + (fragment << mgr->page_bits); + perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits); memcpy (perm, (unsigned char *)page + off, mgr->page_size); off += mgr->page_size; fragment++; @@ -919,11 +894,11 @@ unsigned char *perm; // set CLOCK bit in latch // decrement pin count +int value; + void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line) { bt_mutexlock(latch->modify); -//if( !(latch->pin & ~CLOCK_bit) ) -//abort(); latch->pin |= CLOCK_bit; latch->pin--; @@ -937,14 +912,12 @@ BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch) uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets); BtPage page; - if( latch->leaf ) - page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool); - else - page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool); -//if( latch->leaf ) -//if( page->lvl ) -//abort(); - return page; + if( latch->leaf ) + page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool); + else + page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool); + + return page; } // return next available leaf entry @@ -1035,12 +1008,13 @@ BtPage page; bt_mutexlock(mgr->leaftable[hashidx].latch); - if( entry = mgr->leaftable[hashidx].entry ) do - { + if( entry = mgr->leaftable[hashidx].entry ) + do { latch = mgr->leafsets + entry; + if( page_no == latch->page_no ) break; - } while( entry = latch->next ); + } while( entry = latch->next ); // found our entry: increment pin @@ -1049,8 +1023,7 @@ BtPage page; bt_mutexlock(latch->modify); latch->pin |= CLOCK_bit; latch->pin++; -//if( !latch->leaf ) -//abort(); + bt_releasemutex(latch->modify); bt_releasemutex(mgr->leaftable[hashidx].latch); return latch; @@ -1059,12 +1032,8 @@ BtPage page; // find and reuse unpinned entry trynext: - entry = bt_availleaf (mgr); latch = mgr->leafsets + entry; -//if( latch->page_no ) -//if( !latch->leaf ) -//abort(); idx = latch->page_no % mgr->leafhash; @@ -1097,9 +1066,6 @@ trynext: // update permanent page area in btree from buffer pool // no read-lock is required since page is not pinned. -//if( latch->page_no ) -//if( !latch->leaf ) -//abort(); if( latch->dirty ) if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) ) return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL; @@ -1152,6 +1118,7 @@ BtPage page; if( entry = mgr->hashtable[hashidx].entry ) do { latch = mgr->latchsets + entry; + if( page_no == latch->page_no ) break; } while( entry = latch->next ); @@ -1171,7 +1138,6 @@ BtPage page; // find and reuse unpinned entry trynext: - entry = bt_availnext (mgr); latch = mgr->latchsets + entry; @@ -1206,9 +1172,6 @@ trynext: // update permanent page area in btree from buffer pool // no read-lock is required since page is not pinned. -//if( latch->page_no ) -//if( latch->leaf ) -//abort(); if( latch->dirty ) if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) ) return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL; @@ -1802,6 +1765,10 @@ BtKey *ptr; bt_lockpage(mode, set->latch, thread_no, __LINE__); + // grab our fence key + + ptr=keyptr(set->page,set->page->cnt); + if( set->page->free ) return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0; @@ -1828,11 +1795,27 @@ BtKey *ptr; prevpage = set->page; prevmode = mode; + // if requested key is beyond our fence, + // slide to the right + + if( keycmp (ptr, key, len) < 0 ) + if( page_no = bt_getid(set->page->right) ) + continue; + + // if page is part of a delete operation, + // slide to the left; + + if( set->page->kill ) { + bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__); + page_no = bt_getid(set->page->left); + bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__); + continue; + } + // find key on page at this level // and descend to requested level - if( !set->page->kill ) - if( slot = bt_findslot (set->page, key, len) ) { + if( slot = bt_findslot (set->page, key, len) ) { if( drill == lvl ) return slot; @@ -1847,7 +1830,7 @@ BtKey *ptr; val = valptr(set->page, slot); if( val->len == BtId ) - page_no = bt_getid(valptr(set->page, slot)->value); + page_no = bt_getid(val->value); else return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0; @@ -1857,9 +1840,7 @@ BtKey *ptr; // slide right into next page - bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__); page_no = bt_getid(set->page->right); - bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__); } while( page_no ); @@ -1877,8 +1858,6 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { unsigned char *freechain; -//if( (set->latch->pin & ~CLOCK_bit) > 1 ) -//abort(); if( set->page->lvl ) freechain = mgr->pagezero->freechain; else { @@ -2004,21 +1983,20 @@ uint idx; // returns with page unpinned // from the page pool. -BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) +BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl) { -unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray]; +unsigned char lowerfence[BT_keyarray]; uint page_size = mgr->page_size; +BtPageSet right[1], temp[1]; unsigned char value[BtId]; -uint lvl = set->page->lvl; -BtPageSet right[1]; -uid page_no; +uid page_no, right2; BtKey *ptr; if( !lvl ) page_size <<= mgr->leaf_xtra; - // cache copy of fence key - // to remove in parent + // cache copy of original fence key + // that is being deleted. ptr = keyptr(set->page, set->page->cnt); memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey)); @@ -2031,65 +2009,71 @@ BtKey *ptr; right->page = bt_mappage (mgr, right->latch); else return 0; -//if( right->page->lvl ) -//abort(); - bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__); - - // cache copy of key to update - ptr = keyptr(right->page, right->page->cnt); - memcpy (higherfence, ptr, ptr->len + sizeof(BtKey)); + bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__); if( right->page->kill ) return mgr->line = __LINE__, mgr->err = BTERR_struct; // pull contents of right peer into our empty page + // preserving our left page number. - memcpy (right->page->left, set->page->left, BtId); + bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__); + page_no = bt_getid(set->page->left); memcpy (set->page, right->page, page_size); + bt_putid (set->page->left, page_no); + bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__); + set->page->page_no = set->latch->page_no; set->latch->dirty = 1; - // mark right page deleted and point it to left page - // until we can post parent updates that remove access - // to the deleted page. + if( right2 = bt_getid (right->page->right) ) { + if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) ) + temp->page = bt_mappage (mgr, temp->latch); + else + return 0; - bt_putid (right->page->right, set->latch->page_no); - right->latch->dirty = 1; - right->page->kill = 1; + bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__); + bt_putid (temp->page->left, set->latch->page_no); + bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__); + bt_unpinlatch (temp->latch, thread_no, __LINE__); + } else { // page is rightmost + bt_mutexlock (mgr->lock); + bt_putid (mgr->pagezero->alloc->left, set->latch->page_no); + bt_releasemutex(mgr->lock); + } - bt_lockpage (BtLockParent, right->latch, thread_no, __LINE__); - bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__); + // mark right page deleted and release lock - bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__); - bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__); + right->latch->dirty = 1; + right->page->kill = 1; // redirect higher key directly to our new node contents + ptr = keyptr(right->page, right->page->cnt); bt_putid (value, set->latch->page_no); - ptr = (BtKey*)higherfence; - if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) ) + if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) ) return mgr->err; - // delete old lower key to our node + bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__); + + // delete orignal fence key in parent + // and unlock our page. - ptr = (BtKey*)lowerfence; + ptr = (BtKey *)lowerfence; if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) return mgr->err; + bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__); + bt_unpinlatch (set->latch, thread_no, __LINE__); + // obtain delete and write locks to right node - bt_unlockpage (BtLockParent, right->latch, thread_no, __LINE__); bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__); bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__); -while( (right->latch->pin & ~CLOCK_bit) > 1 ) -sched_yield(); bt_freepage (mgr, right, thread_no); -//fprintf(stderr, "DelPage %d by %d at %d\n", (uint)right->latch->page_no, thread_no, __LINE__); - bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__); - bt_unpinlatch (set->latch, thread_no, __LINE__); return 0; } @@ -2166,7 +2150,7 @@ BtVal *val; // delete empty page if( !set->page->act ) - return bt_deletepage (mgr, set, thread_no); + return bt_deletepage (mgr, set, thread_no, set->page->lvl); set->latch->dirty = 1; bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__); @@ -2441,16 +2425,17 @@ BtVal *val; // return pool entry for new right // page, pinned & unlocked -uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no) +uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft) { uint page_size = mgr->page_size; +BtPageSet right[1], temp[1]; uint cnt = 0, idx = 0, max; uint lvl = set->page->lvl; -BtPageSet right[1]; BtKey *key, *ptr; BtVal *val, *src; BtPage frame; uid right2; +uint entry; uint prev; if( !set->page->lvl ) @@ -2499,14 +2484,36 @@ uint prev; // link right node - if( set->latch->page_no > ROOT_page ) - bt_putid (frame->right, bt_getid (set->page->right)); + if( set->latch->page_no > ROOT_page ) { + right2 = bt_getid (set->page->right); + bt_putid (frame->right, right2); + bt_putid (frame->left, set->latch->page_no); + } // get new free page and write higher keys to it. if( bt_newpage(mgr, right, frame, thread_no) ) return 0; + // link far right's left pointer to new page + + if( linkleft && set->latch->page_no > ROOT_page ) + if( right2 ) { + if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) ) + temp->page = bt_mappage (mgr, temp->latch); + else + return 0; + + bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__); + bt_putid (temp->page->left, right->latch->page_no); + bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__); + bt_unpinlatch (temp->latch, thread_no, __LINE__); + } else { // page is rightmost + bt_mutexlock (mgr->lock); + bt_putid (mgr->pagezero->alloc->left, right->latch->page_no); + bt_releasemutex(mgr->lock); + } + // process lower keys memcpy (frame, set->page, page_size); @@ -2520,9 +2527,6 @@ uint prev; cnt = 0; idx = 0; - if( slotptr(frame, max)->type == Librarian ) - max--; - // assemble page of smaller keys while( cnt++ < max ) { @@ -2553,7 +2557,8 @@ uint prev; set->page->cnt = idx; free(frame); - return right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets); + entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets); + return entry; } // fix keys for newly split page @@ -2568,8 +2573,6 @@ uint lvl = set->page->lvl; BtPage page; BtKey *ptr; -//if( !lvl ) -//abort(); // if current page is the root page, split it if( set->latch->page_no == ROOT_page ) @@ -2626,8 +2629,7 @@ BtKey *ptr; BtVal *val; int rate; - // if found slot > desired slot and previous slot - // is a librarian slot, use it + // if previous slot is a librarian slot, use it if( slot > 1 ) if( slotptr(set->page, slot-1)->type == Librarian ) @@ -2730,8 +2732,8 @@ BtVal *val; // if librarian slot == found slot, advance to real slot - if( node->type == Librarian ) - if( !keycmp (ptr, key, keylen) ) { + if( node->type == Librarian ) { +// if( !keycmp (ptr, key, keylen) ) { ptr = keyptr(set->page, ++slot); node = slotptr(set->page, slot); } @@ -2744,78 +2746,84 @@ BtVal *val; switch( type ) { case Unique: case Duplicate: - if( keycmp (ptr, key, keylen) ) - if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) { + if( !keycmp (ptr, key, keylen) ) + break; + + if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) ) return mgr->err; + else + goto insxit; - bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__); - bt_unpinlatch (set->latch, thread_no, __LINE__); - return 0; - } else if( !(entry = bt_splitpage (mgr, set, thread_no)) ) - return mgr->err_thread = thread_no, mgr->err; - else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) - return mgr->err_thread = thread_no, mgr->err; - else - continue; - - // if key already exists, update value and return + if( entry = bt_splitpage (mgr, set, thread_no, 1) ) + if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) + continue; - val = valptr(set->page, slot); + return mgr->err_thread = thread_no, mgr->err; - if( val->len >= vallen ) { - if( slotptr(set->page, slot)->dead ) - set->page->act++; - node->type = type; - node->dead = 0; + case Update: + if( keycmp (ptr, key, keylen) ) + goto insxit; - set->page->garbage += val->len - vallen; - set->latch->dirty = 1; - val->len = vallen; - memcpy (val->value, value, vallen); - bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__); - bt_unpinlatch (set->latch, thread_no, __LINE__); - return 0; - } + break; + } - // new update value doesn't fit in existing value area - // make sure page has room + // if key already exists, update value and return - if( !node->dead ) - set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal); - else - set->page->act++; + val = valptr(set->page, slot); + if( val->len >= vallen ) { + if( node->dead ) + set->page->act++; node->type = type; node->dead = 0; - if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) ) - if( !(entry = bt_splitpage (mgr, set, thread_no)) ) - return mgr->err_thread = thread_no, mgr->err; - else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) - return mgr->err_thread = thread_no, mgr->err; - else + set->page->garbage += val->len - vallen; + set->latch->dirty = 1; + val->len = vallen; + memcpy (val->value, value, vallen); + goto insxit; + } + + // new update value doesn't fit in existing value area + // make sure page has room + + if( !node->dead ) + set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal); + else + set->page->act++; + + node->type = type; + node->dead = 0; + + if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) + break; + + if( entry = bt_splitpage (mgr, set, thread_no, 1) ) + if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) continue; - // copy key and value onto page and update slot + return mgr->err_thread = thread_no, mgr->err; + } + + // copy key and value onto page and update slot - set->page->min -= vallen + sizeof(BtVal); - val = (BtVal*)((unsigned char *)set->page + set->page->min); - memcpy (val->value, value, vallen); - val->len = vallen; + set->page->min -= vallen + sizeof(BtVal); + val = (BtVal*)((unsigned char *)set->page + set->page->min); + memcpy (val->value, value, vallen); + val->len = vallen; - set->latch->dirty = 1; - set->page->min -= keylen + sizeof(BtKey); - ptr = (BtKey*)((unsigned char *)set->page + set->page->min); - memcpy (ptr->key, key, keylen); - ptr->len = keylen; + set->latch->dirty = 1; + set->page->min -= keylen + sizeof(BtKey); + ptr = (BtKey*)((unsigned char *)set->page + set->page->min); + memcpy (ptr->key, key, keylen); + ptr->len = keylen; - node->off = set->page->min; - bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__); - bt_unpinlatch (set->latch, thread_no, __LINE__); - return 0; - } - } + slotptr(set->page,slot)->off = set->page->min; + +insxit: + bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__); + bt_unpinlatch (set->latch, thread_no, __LINE__); return 0; } @@ -2876,7 +2884,7 @@ uint entry, slot; // split page - if( entry = bt_splitpage (mgr, set, thread_no) ) + if( entry = bt_splitpage (mgr, set, thread_no, 0) ) latch = mgr->leafsets + entry; else return mgr->err_thread = thread_no, mgr->err; @@ -2884,7 +2892,6 @@ uint entry, slot; // splice right page into split chain // and WriteLock it -//fprintf(stderr, "SplitPg %d by %d at %d\n", (uint)latch->page_no, thread_no, __LINE__); bt_lockpage(BtLockWrite, latch, thread_no, __LINE__); latch->split = set->latch->split; set->latch->split = entry; @@ -3001,7 +3008,7 @@ int type; // promote page into larger btree if( bt->main ) - while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->thread_no ) + while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no ) if( bt_txnpromote (bt) ) return bt->mgr->err; @@ -3014,19 +3021,16 @@ int type; BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no) { -unsigned char fencekey[BT_keyarray], prvfence[BT_keyarray]; uint src, idx, samepage, entry; BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtLatchSet *latch; uid right_page_no; AtomicTxn *locks; -BtKey *key, *ptr, *prv, *cur; +BtKey *key, *ptr; BtPage page; BtVal *val; -uint slot, prvslot; - locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); // Load the leaf page for each key @@ -3043,26 +3047,12 @@ uint slot, prvslot; samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0; if( !samepage ) - if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) ) -//memcpy (prvfence, fencekey, 10), -//src>1 ? memcpy (fencekey, ptr->key, 10):NULL, -//prv = ptr, -//cur = keyptr(set->page, slot), + if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) ) ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0; else return mgr->err; entry = set->latch - mgr->leafsets; - - // is this actually on the same page? - -//if( !samepage ) -// for( idx = src; --idx; ) -//if( entry == locks[idx].entry ) { -//fprintf(stderr, "Dup page %d by thread %d\n", (uint)set->latch->page_no, thread_no); -// abort(); -// } - locks[src].reuse = samepage; locks[src].entry = entry; @@ -3148,7 +3138,7 @@ uint slot, prvslot; ptr = keyptr(prev->page,prev->page->cnt); bt_putid (value, prev->latch->page_no); -//fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__); + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) return mgr->err; @@ -3193,8 +3183,7 @@ uint slot, prvslot; ptr = keyptr(prev->page,prev->page->cnt); bt_putid (value, prev->latch->page_no); -//fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__); - if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) ) return mgr->err; if( lsm ) @@ -3228,7 +3217,7 @@ uint slot, prvslot; // any and all splits were reversed, and the // master page located in prev is empty, delete it - if( bt_deletepage (mgr, prev, thread_no) ) + if( bt_deletepage (mgr, prev, thread_no, 0) ) return mgr->err; } @@ -3259,6 +3248,11 @@ BtVal *val; set->latch = bt->mgr->leafsets + entry; + // skip this entry if it has never been used + + if( !set->latch->page_no ) + continue; + if( !bt_mutextry(set->latch->modify) ) continue; @@ -3271,9 +3265,9 @@ BtVal *val; set->page = bt_mappage (bt->mgr, set->latch); - // entry never used or has no left or right sibling + // entry has no right sibling - if( !set->latch->page_no || !bt_getid (set->page->right) ) { + if( !bt_getid (set->page->right) ) { bt_releasemutex(set->latch->modify); continue; } @@ -3304,7 +3298,7 @@ fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_n // now delete the page - if( bt_deletepage (bt->mgr, set, bt->thread_no) ) + if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) ) fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread); return bt->mgr->err; @@ -3494,28 +3488,37 @@ struct timeval tv[1]; } #endif -void bt_poolaudit (BtMgr *mgr) +void bt_poolaudit (BtMgr *mgr, char *type) { -BtLatchSet *latch; -uint entry = 0; +BtLatchSet *latch, test[1]; +uint entry; + + memset (test, 0, sizeof(test)); + + if( memcmp (test, mgr->latchsets, sizeof(test)) ) + fprintf(stderr, "%s latchset zero overwritten\n", type); + + if( memcmp (test, mgr->leafsets, sizeof(test)) ) + fprintf(stderr, "%s leafset zero overwritten\n", type); - while( ++entry < mgr->latchtotal ) { + for( entry = 0; ++entry < mgr->latchtotal; ) { latch = mgr->latchsets + entry; - if( *latch->readwr->wrt->value ) - fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no); + if( *latch->modify->value ) + fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no); -// if( *latch->access->bits->tid ) -// fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no); + if( latch->pin & ~CLOCK_bit ) + fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no); + } -// if( *latch->parent->bits->tid ) -// fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no); + for( entry = 0; ++entry < mgr->leaftotal; ) { + latch = mgr->leafsets + entry; if( *latch->modify->value ) - fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no); + fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no); if( latch->pin & ~CLOCK_bit ) - fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no); + fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no); } } @@ -3893,12 +3896,14 @@ BtKey *ptr; for( idx = 0; idx < cnt; idx++ ) CloseHandle(threads[idx]); #endif - bt_poolaudit(mgr); + bt_poolaudit(mgr, "cache"); if( main ) - bt_poolaudit(main); + bt_poolaudit(main, "main"); - fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found); + fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found); + if( main ) + fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found); if( main ) bt_mgrclose (main);