From c78ccc15b7098665ae08c2cbe577ad16169da06b Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 29 Sep 2014 10:13:33 -0700 Subject: [PATCH] Fix latching protocol for bt_atomicload --- threadskv8.c | 193 ++++++++++++++++++++++++++------------------------- 1 file changed, 97 insertions(+), 96 deletions(-) diff --git a/threadskv8.c b/threadskv8.c index 16ad09f..02ac198 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -275,6 +275,7 @@ typedef struct { uint latchtotal; // number of page latch entries uint latchhash; // number of latch hash table slots uint latchvictim; // next latch entry to examine + ushort thread_no[1]; // next thread number BtHashEntry *hashtable; // the buffer pool hash table entries BtLatchSet *latchsets; // mapped latch set from buffer pool unsigned char *pagepool; // mapped to the buffer pool pages @@ -294,6 +295,7 @@ typedef struct { int found; // last delete or insert was found int err; // last error int reads, writes; // number of reads and writes from the btree + ushort thread_no; // thread number } BtDb; typedef enum { @@ -1118,6 +1120,11 @@ BtDb *bt = malloc (sizeof(*bt)); #endif bt->frame = (BtPage)bt->mem; bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size); +#ifdef unix + bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1; +#else + bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1); +#endif return bt; } @@ -1145,7 +1152,7 @@ int ans; // place write, read, or parent lock on requested page_no. -void bt_lockpage(BtLock mode, BtLatchSet *latch) +void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: @@ -1171,7 +1178,7 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) // remove write, read, or parent lock on requested page -void bt_unlockpage(BtLock mode, BtLatchSet *latch) +void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: @@ -1298,27 +1305,27 @@ uint mode, prevmode; // obtain access lock using lock chaining with Access mode if( page_no > ROOT_page ) - bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockAccess, set->latch); set->page = bt_mappage (bt, set->latch); // release & unpin parent or left sibling page if( prevpage ) { - bt_unlockpage(prevmode, prevlatch); + bt_unlockpage(bt, prevmode, prevlatch); bt_unpinlatch (prevlatch); prevpage = 0; } // obtain mode lock using lock chaining through AccessLock - bt_lockpage(mode, set->latch); + bt_lockpage(bt, mode, set->latch); if( set->page->free ) return bt->err = BTERR_struct, 0; if( page_no > ROOT_page ) - bt_unlockpage(BtLockAccess, set->latch); + bt_unlockpage(bt, BtLockAccess, set->latch); // re-read and re-lock root after determining actual level of root @@ -1329,7 +1336,7 @@ uint mode, prevmode; drill = set->page->lvl; if( lock != BtLockRead && drill == lvl ) { - bt_unlockpage(mode, set->latch); + bt_unlockpage(bt, mode, set->latch); bt_unpinlatch (set->latch); continue; } @@ -1342,10 +1349,8 @@ uint mode, prevmode; // find key on page at this level // and descend to requested level - if( set->page->kill ) - goto slideright; - - if( slot = bt_findslot (set->page, key, len) ) { + if( !set->page->kill ) + if( slot = bt_findslot (set->page, key, len) ) { if( drill == lvl ) return slot; @@ -1360,13 +1365,11 @@ uint mode, prevmode; page_no = bt_getid(valptr(set->page, slot)->value); drill--; continue; - } + } // or slide right into next page -slideright: page_no = bt_getid(set->page->right); - } while( page_no ); // return error on end of right chain @@ -1393,8 +1396,8 @@ void bt_freepage (BtDb *bt, BtPageSet *set) // unlock released page - bt_unlockpage (BtLockDelete, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_unlockpage (bt, BtLockDelete, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); // unlock allocation page @@ -1424,8 +1427,8 @@ uint idx; ptr = keyptr(set->page, set->page->cnt); memcpy (leftkey, ptr, ptr->len + sizeof(BtKey)); - bt_lockpage (BtLockParent, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockParent, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); // insert new (now smaller) fence key @@ -1442,7 +1445,7 @@ uint idx; if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) ) return bt->err; - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch(set->latch); return 0; } @@ -1470,8 +1473,8 @@ uint idx; else return bt->err; - bt_lockpage (BtLockDelete, child->latch); - bt_lockpage (BtLockWrite, child->latch); + bt_lockpage (bt, BtLockDelete, child->latch); + bt_lockpage (bt, BtLockWrite, child->latch); memcpy (root->page, child->page, bt->mgr->page_size); root->latch->dirty = 1; @@ -1480,7 +1483,7 @@ uint idx; } while( root->page->lvl > 1 && root->page->act == 1 ); - bt_unlockpage (BtLockWrite, root->latch); + bt_unlockpage (bt, BtLockWrite, root->latch); bt_unpinlatch (root->latch); return 0; } @@ -1513,7 +1516,7 @@ BtKey *ptr; else return 0; - bt_lockpage (BtLockWrite, right->latch); + bt_lockpage (bt, BtLockWrite, right->latch); // cache copy of key to update @@ -1536,11 +1539,11 @@ BtKey *ptr; right->latch->dirty = 1; right->page->kill = 1; - bt_lockpage (BtLockParent, right->latch); - bt_unlockpage (BtLockWrite, right->latch); + bt_lockpage (bt, BtLockParent, right->latch); + bt_unlockpage (bt, BtLockWrite, right->latch); - bt_lockpage (BtLockParent, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockParent, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); // redirect higher key directly to our new node contents @@ -1559,12 +1562,12 @@ BtKey *ptr; // obtain delete and write locks to right node - bt_unlockpage (BtLockParent, right->latch); - bt_lockpage (BtLockDelete, right->latch); - bt_lockpage (BtLockWrite, right->latch); + bt_unlockpage (bt, BtLockParent, right->latch); + bt_lockpage (bt, BtLockDelete, right->latch); + bt_lockpage (bt, BtLockWrite, right->latch); bt_freepage (bt, right); - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); bt->found = 1; return 0; @@ -1633,7 +1636,7 @@ BtVal *val; return bt_deletepage (bt, set); set->latch->dirty = 1; - bt_unlockpage(BtLockWrite, set->latch); + bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); return bt->found = found, 0; } @@ -1665,13 +1668,13 @@ uid page_no; // obtain access lock using lock chaining with Access mode - bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockAccess, set->latch); - bt_unlockpage(BtLockRead, prevlatch); + bt_unlockpage(bt, BtLockRead, prevlatch); bt_unpinlatch (prevlatch); - bt_lockpage(BtLockRead, set->latch); - bt_unlockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); + bt_unlockpage(bt, BtLockAccess, set->latch); return 1; } @@ -1729,7 +1732,7 @@ BtVal *val; } while( slot = bt_findnext (bt, set, slot) ); - bt_unlockpage (BtLockRead, set->latch); + bt_unlockpage (bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); return ret; } @@ -1885,7 +1888,7 @@ BtVal *val; // release and unpin root pages - bt_unlockpage(BtLockWrite, root->latch); + bt_unlockpage(bt, BtLockWrite, root->latch); bt_unpinlatch (root->latch); bt_unpinlatch (right); @@ -2034,10 +2037,10 @@ BtKey *ptr; // insert new fences in their parent pages - bt_lockpage (BtLockParent, right); + bt_lockpage (bt, BtLockParent, right); - bt_lockpage (BtLockParent, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockParent, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); // insert new fence for reformulated left block of smaller keys @@ -2055,10 +2058,10 @@ BtKey *ptr; if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) ) return bt->err; - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); - bt_unlockpage (BtLockParent, right); + bt_unlockpage (bt, BtLockParent, right); bt_unpinlatch (right); return 0; } @@ -2131,7 +2134,7 @@ BtVal *val; node->dead = 0; if( release ) { - bt_unlockpage (BtLockWrite, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); } @@ -2216,7 +2219,7 @@ uint type; slotptr(set->page, slot)->dead = 0; val->len = vallen; memcpy (val->value, value, vallen); - bt_unlockpage(BtLockWrite, set->latch); + bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); return 0; } @@ -2250,7 +2253,7 @@ uint type; ptr->len = keylen; slotptr(set->page, slot)->off = set->page->min; - bt_unlockpage(BtLockWrite, set->latch); + bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); return 0; } @@ -2281,7 +2284,7 @@ BtLatchSet *prevlatch; uid page_no; uint slot; - // find level zero page + // find level one slot if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) return 0; @@ -2307,30 +2310,28 @@ uint slot; // obtain read lock using lock chaining with Access mode // release & unpin parent/left sibling page - bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockAccess, set->latch); - bt_unlockpage(BtLockRead, prevlatch); + bt_unlockpage(bt, BtLockRead, prevlatch); bt_unpinlatch (prevlatch); - bt_lockpage(BtLockRead, set->latch); - bt_unlockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); // find key on page at this level // and descend to requested level if( !set->page->kill ) if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { - bt_unlockpage(BtLockRead, set->latch); - bt_lockpage(BtLockAccess, set->latch); - bt_lockpage(BtLockAtomic, set->latch); - bt_lockpage(BtLockRead, set->latch); - bt_unlockpage(BtLockAccess, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); + bt_lockpage(bt, BtLockAtomic, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); + bt_unlockpage(bt, BtLockAccess, set->latch); if( !set->page->kill ) if( slot = bt_findslot (set->page, key, len) ) return slot; - bt_unlockpage(BtLockAtomic, set->latch); + bt_unlockpage(bt, BtLockAtomic, set->latch); } // slide right into next page @@ -2406,7 +2407,7 @@ uint entry; latch->split = set->latch->split; set->latch->split = entry; - bt_lockpage(BtLockWrite, latch); + bt_lockpage(bt, BtLockWrite, latch); } return bt->err = BTERR_atomic; @@ -2448,7 +2449,7 @@ unsigned char value[BtId]; uid right_page_no; BtKey *ptr; - bt_lockpage(BtLockWrite, prev->latch); + bt_lockpage(bt, BtLockWrite, prev->latch); // grab the right sibling @@ -2457,8 +2458,8 @@ BtKey *ptr; else return bt->err; - bt_lockpage(BtLockAtomic, right->latch); - bt_lockpage(BtLockWrite, right->latch); + bt_lockpage(bt, BtLockAtomic, right->latch); + bt_lockpage(bt, BtLockWrite, right->latch); // and pull contents over empty page // while preserving master's left link @@ -2485,8 +2486,8 @@ BtKey *ptr; // now that master page is in good shape we can // remove its locks. - bt_unlockpage (BtLockAtomic, prev->latch); - bt_unlockpage (BtLockWrite, prev->latch); + bt_unlockpage (bt, BtLockAtomic, prev->latch); + bt_unlockpage (bt, BtLockWrite, prev->latch); // fix master's right sibling's left pointer // to remove scanner's poiner to the right page @@ -2495,11 +2496,11 @@ BtKey *ptr; if( temp->latch = bt_pinlatch (bt, right_page_no, 1) ) temp->page = bt_mappage (bt, temp->latch); - bt_lockpage (BtLockWrite, temp->latch); + bt_lockpage (bt, BtLockWrite, temp->latch); bt_putid (temp->page->left, prev->latch->page_no); temp->latch->dirty = 1; - bt_unlockpage (BtLockWrite, temp->latch); + bt_unlockpage (bt, BtLockWrite, temp->latch); bt_unpinlatch (temp->latch); } else { // master is now the far right page bt_spinwritelock (bt->mgr->lock); @@ -2510,10 +2511,10 @@ BtKey *ptr; // now that there are no pointers to the right page // we can delete it after the last read access occurs - bt_unlockpage (BtLockWrite, right->latch); - bt_unlockpage (BtLockAtomic, right->latch); - bt_lockpage (BtLockDelete, right->latch); - bt_lockpage (BtLockWrite, right->latch); + bt_unlockpage (bt, BtLockWrite, right->latch); + bt_unlockpage (bt, BtLockAtomic, right->latch); + bt_lockpage (bt, BtLockDelete, right->latch); + bt_lockpage (bt, BtLockWrite, right->latch); bt_freepage (bt, right); return 0; } @@ -2578,7 +2579,7 @@ int type; if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); else // release read on previous page - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); if( !slot ) if( slot = bt_atomicload(bt, set, key->key, key->len) ) @@ -2610,13 +2611,13 @@ int type; // return constraint violation if key already exists - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); result = src; while( src ) { if( locks[src].entry ) { set->latch = bt->mgr->latchsets + locks[src].entry; - bt_unlockpage(BtLockAtomic, set->latch); + bt_unlockpage(bt, BtLockAtomic, set->latch); bt_unpinlatch (set->latch); } src--; @@ -2631,7 +2632,7 @@ int type; // unlock last loadpage lock if( source->cnt > 1 ) - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); // obtain write lock for each master page @@ -2639,7 +2640,7 @@ int type; if( locks[src].reuse ) continue; else - bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry); + bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry); // insert or delete each key // process any splits or merges @@ -2698,7 +2699,7 @@ int type; if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, bt->mgr->page_size); - bt_lockpage (BtLockDelete, set->latch); + bt_lockpage (bt, BtLockDelete, set->latch); bt_freepage (bt, set); prev->latch->dirty = 1; @@ -2710,7 +2711,7 @@ int type; if( !set->page->act ) { memcpy (prev->page->right, set->page->right, BtId); prev->latch->split = set->latch->split; - bt_lockpage (BtLockDelete, set->latch); + bt_lockpage (bt, BtLockDelete, set->latch); bt_freepage (bt, set); continue; } @@ -2735,8 +2736,8 @@ int type; // splice in the left link into the split page bt_putid (set->page->left, prev->latch->page_no); - bt_lockpage(BtLockParent, prev->latch); - bt_unlockpage(BtLockWrite, prev->latch); + bt_lockpage(bt, BtLockParent, prev->latch); + bt_unlockpage(bt, BtLockWrite, prev->latch); *prev = *set; } @@ -2753,10 +2754,10 @@ int type; else return -1; - bt_lockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockWrite, set->latch); bt_putid (set->page->left, prev->latch->page_no); set->latch->dirty = 1; - bt_unlockpage (BtLockWrite, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); } else { // prev is rightmost page bt_spinwritelock (bt->mgr->lock); @@ -2781,20 +2782,20 @@ int type; tail = leaf; - bt_lockpage(BtLockParent, prev->latch); - bt_unlockpage(BtLockWrite, prev->latch); + bt_lockpage(bt, BtLockParent, prev->latch); + bt_unlockpage(bt, BtLockWrite, prev->latch); // remove atomic lock on master page - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(bt, BtLockAtomic, latch); continue; } // finished if prev page occupied (either master or final split) if( prev->page->act ) { - bt_unlockpage(BtLockWrite, latch); - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(bt, BtLockWrite, latch); + bt_unlockpage(bt, BtLockAtomic, latch); bt_unpinlatch(latch); continue; } @@ -2831,7 +2832,7 @@ int type; // leave atomic lock in place until // deletion completes in next phase. - bt_unlockpage(BtLockWrite, prev->latch); + bt_unlockpage(bt, BtLockWrite, prev->latch); } // add & delete keys for any pages split or merged during transaction @@ -2865,7 +2866,7 @@ int type; } if( !leaf->nounlock ) - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); tail = leaf->next; @@ -2890,9 +2891,9 @@ BtPageSet set[1]; else return 0; - bt_lockpage(BtLockRead, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); memcpy (bt->cursor, set->page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); bt->cursor_page = page_no; @@ -2923,9 +2924,9 @@ findourself: else return 0; - bt_lockpage(BtLockRead, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); memcpy (bt->cursor, set->page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); next = bt_getid (bt->cursor->right); @@ -2971,11 +2972,11 @@ uid right; else return 0; - bt_lockpage(BtLockRead, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); memcpy (bt->cursor, set->page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); slot = 0; @@ -3000,7 +3001,7 @@ uint slot; bt->cursor_page = set->latch->page_no; - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); return slot; } @@ -3329,7 +3330,7 @@ FILE *in; set->page = bt_mappage (bt, set->latch); else fprintf(stderr, "unable to obtain latch"), exit(1); - bt_lockpage (BtLockRead, set->latch); + bt_lockpage (bt, BtLockRead, set->latch); next = bt_getid (set->page->right); for( slot = 0; slot++ < set->page->cnt; ) @@ -3348,7 +3349,7 @@ FILE *in; cnt++; } - bt_unlockpage (BtLockRead, set->latch); + bt_unlockpage (bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); } while( page_no = next ); -- 2.40.0