X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv8.c;h=9d17a4a6a99cccb611911c7e6fbb91b36d973e36;hp=91ec929670d8fd4ed6a27cd4fe040f8c9c038ec5;hb=HEAD;hpb=97a2992f299b35ede35012314c35857ff28d174e diff --git a/threadskv8.c b/threadskv8.c index 91ec929..9d17a4a 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -112,13 +112,6 @@ typedef struct { ushort serving[1]; } RWLock; -// write only queue lock - -typedef struct { - ushort ticket[1]; - ushort serving[1]; -} WOLock; - #define PHID 0x1 #define PRES 0x2 #define MASK 0x3 @@ -141,6 +134,14 @@ volatile typedef struct { #define BOTH 3 #define SHARE 4 +// write only reentrant lock + +typedef struct { + BtSpinLatch xcl[1]; + volatile ushort tid[1]; + volatile ushort dup[1]; +} WOLock; + // hash table entries typedef struct { @@ -275,6 +276,7 @@ typedef struct { uint latchtotal; // number of page latch entries uint latchhash; // number of latch hash table slots uint latchvictim; // next latch entry to examine + ushort thread_no[1]; // next thread number BtHashEntry *hashtable; // the buffer pool hash table entries BtLatchSet *latchsets; // mapped latch set from buffer pool unsigned char *pagepool; // mapped to the buffer pool pages @@ -294,6 +296,7 @@ typedef struct { int found; // last delete or insert was found int err; // last error int reads, writes; // number of reads and writes from the btree + ushort thread_no; // thread number } BtDb; typedef enum { @@ -407,29 +410,39 @@ uid bt_newdup (BtDb *bt) #endif } -// Write-Only Queue Lock +void bt_spinreleasewrite(BtSpinLatch *latch); +void bt_spinwritelock(BtSpinLatch *latch); -void WriteOLock (WOLock *lock) +// Write-Only Reentrant Lock + +void WriteOLock (WOLock *lock, ushort tid) { -ushort tix; -#ifdef unix - tix = __sync_fetch_and_add (lock->ticket, 1); -#else - tix = _InterlockedExchangeAdd16 (lock->ticket, 1); -#endif - // wait for our ticket to come up + while( 1 ) { + bt_spinwritelock(lock->xcl); - while( tix != lock->serving[0] ) -#ifdef unix - sched_yield(); -#else - SwitchToThread (); -#endif + if( *lock->tid == tid ) { + *lock->dup += 1; + bt_spinreleasewrite(lock->xcl); + return; + } + if( !*lock->tid ) { + *lock->tid = tid; + bt_spinreleasewrite(lock->xcl); + return; + } + bt_spinreleasewrite(lock->xcl); + sched_yield(); + } } void WriteORelease (WOLock *lock) { - lock->serving[0]++; + if( *lock->dup ) { + *lock->dup -= 1; + return; + } + + *lock->tid = 0; } // Phase-Fair reader/writer lock implementation @@ -1118,6 +1131,11 @@ BtDb *bt = malloc (sizeof(*bt)); #endif bt->frame = (BtPage)bt->mem; bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size); +#ifdef unix + bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1; +#else + bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1); +#endif return bt; } @@ -1145,7 +1163,7 @@ int ans; // place write, read, or parent lock on requested page_no. -void bt_lockpage(BtLock mode, BtLatchSet *latch) +void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: @@ -1161,17 +1179,21 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) WriteLock (latch->access); break; case BtLockParent: - WriteOLock (latch->parent); + WriteOLock (latch->parent, bt->thread_no); break; case BtLockAtomic: - WriteOLock (latch->atomic); + WriteOLock (latch->atomic, bt->thread_no); + break; + case BtLockAtomic | BtLockRead: + WriteOLock (latch->atomic, bt->thread_no); + ReadLock (latch->readwr); break; } } // remove write, read, or parent lock on requested page -void bt_unlockpage(BtLock mode, BtLatchSet *latch) +void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch) { switch( mode ) { case BtLockRead: @@ -1192,6 +1214,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) case BtLockAtomic: WriteORelease (latch->atomic); break; + case BtLockAtomic | BtLockRead: + WriteORelease (latch->atomic); + ReadRelease (latch->readwr); + break; } } @@ -1298,27 +1324,27 @@ uint mode, prevmode; // obtain access lock using lock chaining with Access mode if( page_no > ROOT_page ) - bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockAccess, set->latch); set->page = bt_mappage (bt, set->latch); // release & unpin parent or left sibling page if( prevpage ) { - bt_unlockpage(prevmode, prevlatch); + bt_unlockpage(bt, prevmode, prevlatch); bt_unpinlatch (prevlatch); prevpage = 0; } // obtain mode lock using lock chaining through AccessLock - bt_lockpage(mode, set->latch); + bt_lockpage(bt, mode, set->latch); if( set->page->free ) return bt->err = BTERR_struct, 0; if( page_no > ROOT_page ) - bt_unlockpage(BtLockAccess, set->latch); + bt_unlockpage(bt, BtLockAccess, set->latch); // re-read and re-lock root after determining actual level of root @@ -1329,7 +1355,7 @@ uint mode, prevmode; drill = set->page->lvl; if( lock != BtLockRead && drill == lvl ) { - bt_unlockpage(mode, set->latch); + bt_unlockpage(bt, mode, set->latch); bt_unpinlatch (set->latch); continue; } @@ -1342,10 +1368,8 @@ uint mode, prevmode; // find key on page at this level // and descend to requested level - if( set->page->kill ) - goto slideright; - - if( slot = bt_findslot (set->page, key, len) ) { + if( !set->page->kill ) + if( slot = bt_findslot (set->page, key, len) ) { if( drill == lvl ) return slot; @@ -1360,13 +1384,11 @@ uint mode, prevmode; page_no = bt_getid(valptr(set->page, slot)->value); drill--; continue; - } + } // or slide right into next page -slideright: page_no = bt_getid(set->page->right); - } while( page_no ); // return error on end of right chain @@ -1393,8 +1415,9 @@ void bt_freepage (BtDb *bt, BtPageSet *set) // unlock released page - bt_unlockpage (BtLockDelete, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_unlockpage (bt, BtLockDelete, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); + bt_unpinlatch (set->latch); // unlock allocation page @@ -1423,8 +1446,8 @@ uint idx; ptr = keyptr(set->page, set->page->cnt); memcpy (leftkey, ptr, ptr->len + sizeof(BtKey)); - bt_lockpage (BtLockParent, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockParent, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); // insert new (now smaller) fence key @@ -1441,7 +1464,7 @@ uint idx; if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) ) return bt->err; - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch(set->latch); return 0; } @@ -1469,18 +1492,17 @@ uint idx; else return bt->err; - bt_lockpage (BtLockDelete, child->latch); - bt_lockpage (BtLockWrite, child->latch); + bt_lockpage (bt, BtLockDelete, child->latch); + bt_lockpage (bt, BtLockWrite, child->latch); memcpy (root->page, child->page, bt->mgr->page_size); root->latch->dirty = 1; bt_freepage (bt, child); - bt_unpinlatch (child->latch); } while( root->page->lvl > 1 && root->page->act == 1 ); - bt_unlockpage (BtLockWrite, root->latch); + bt_unlockpage (bt, BtLockWrite, root->latch); bt_unpinlatch (root->latch); return 0; } @@ -1513,7 +1535,7 @@ BtKey *ptr; else return 0; - bt_lockpage (BtLockWrite, right->latch); + bt_lockpage (bt, BtLockWrite, right->latch); // cache copy of key to update @@ -1536,11 +1558,11 @@ BtKey *ptr; right->latch->dirty = 1; right->page->kill = 1; - bt_lockpage (BtLockParent, right->latch); - bt_unlockpage (BtLockWrite, right->latch); + bt_lockpage (bt, BtLockParent, right->latch); + bt_unlockpage (bt, BtLockWrite, right->latch); - bt_lockpage (BtLockParent, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockParent, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); // redirect higher key directly to our new node contents @@ -1559,15 +1581,13 @@ BtKey *ptr; // obtain delete and write locks to right node - bt_unlockpage (BtLockParent, right->latch); - bt_lockpage (BtLockDelete, right->latch); - bt_lockpage (BtLockWrite, right->latch); + bt_unlockpage (bt, BtLockParent, right->latch); + bt_lockpage (bt, BtLockDelete, right->latch); + bt_lockpage (bt, BtLockWrite, right->latch); bt_freepage (bt, right); - bt_unpinlatch (right->latch); - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); - bt->found = 1; return 0; } @@ -1618,7 +1638,7 @@ BtVal *val; if( bt_fixfence (bt, set, lvl) ) return bt->err; else - return bt->found = found, 0; + return 0; // do we need to collapse root? @@ -1626,7 +1646,7 @@ BtVal *val; if( bt_collapseroot (bt, set) ) return bt->err; else - return bt->found = found, 0; + return 0; // delete empty page @@ -1634,9 +1654,9 @@ BtVal *val; return bt_deletepage (bt, set); set->latch->dirty = 1; - bt_unlockpage(BtLockWrite, set->latch); + bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); - return bt->found = found, 0; + return 0; } BtKey *bt_foundkey (BtDb *bt) @@ -1666,13 +1686,13 @@ uid page_no; // obtain access lock using lock chaining with Access mode - bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockAccess, set->latch); - bt_unlockpage(BtLockRead, prevlatch); + bt_unlockpage(bt, BtLockRead, prevlatch); bt_unpinlatch (prevlatch); - bt_lockpage(BtLockRead, set->latch); - bt_unlockpage(BtLockAccess, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); + bt_unlockpage(bt, BtLockAccess, set->latch); return 1; } @@ -1730,7 +1750,7 @@ BtVal *val; } while( slot = bt_findnext (bt, set, slot) ); - bt_unlockpage (BtLockRead, set->latch); + bt_unlockpage (bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); return ret; } @@ -1775,7 +1795,9 @@ BtVal *val; while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 2; - if( cnt < max && slotptr(bt->frame,cnt)->dead ) + + if( cnt < max || bt->frame->lvl ) + if( slotptr(bt->frame,cnt)->dead ) continue; // copy the value across @@ -1792,11 +1814,9 @@ BtVal *val; // make a librarian slot - if( idx ) { - slotptr(page, ++idx)->off = nxt; - slotptr(page, idx)->type = Librarian; - slotptr(page, idx)->dead = 1; - } + slotptr(page, ++idx)->off = nxt; + slotptr(page, idx)->type = Librarian; + slotptr(page, idx)->dead = 1; // set up the slot @@ -1886,7 +1906,7 @@ BtVal *val; // release and unpin root pages - bt_unlockpage(BtLockWrite, root->latch); + bt_unlockpage(bt, BtLockWrite, root->latch); bt_unpinlatch (root->latch); bt_unpinlatch (right); @@ -1916,8 +1936,10 @@ uint prev; idx = 0; while( cnt++ < max ) { - if( slotptr(set->page, cnt)->dead && cnt < max ) + if( cnt < max || set->page->lvl ) + if( slotptr(set->page, cnt)->dead ) continue; + src = valptr(set->page, cnt); nxt -= src->len + sizeof(BtVal); memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal)); @@ -1929,11 +1951,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(bt->frame, ++idx)->off = nxt; - slotptr(bt->frame, idx)->type = Librarian; - slotptr(bt->frame, idx)->dead = 1; - } + slotptr(bt->frame, ++idx)->off = nxt; + slotptr(bt->frame, idx)->type = Librarian; + slotptr(bt->frame, idx)->dead = 1; // add actual slot @@ -1988,11 +2008,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(set->page, ++idx)->off = nxt; - slotptr(set->page, idx)->type = Librarian; - slotptr(set->page, idx)->dead = 1; - } + slotptr(set->page, ++idx)->off = nxt; + slotptr(set->page, idx)->type = Librarian; + slotptr(set->page, idx)->dead = 1; // add actual slot @@ -2035,10 +2053,10 @@ BtKey *ptr; // insert new fences in their parent pages - bt_lockpage (BtLockParent, right); + bt_lockpage (bt, BtLockParent, right); - bt_lockpage (BtLockParent, set->latch); - bt_unlockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockParent, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); // insert new fence for reformulated left block of smaller keys @@ -2056,10 +2074,10 @@ BtKey *ptr; if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) ) return bt->err; - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); - bt_unlockpage (BtLockParent, right); + bt_unlockpage (bt, BtLockParent, right); bt_unpinlatch (right); return 0; } @@ -2132,7 +2150,7 @@ BtVal *val; node->dead = 0; if( release ) { - bt_unlockpage (BtLockWrite, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); } @@ -2217,7 +2235,7 @@ uint type; slotptr(set->page, slot)->dead = 0; val->len = vallen; memcpy (val->value, value, vallen); - bt_unlockpage(BtLockWrite, set->latch); + bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); return 0; } @@ -2251,7 +2269,7 @@ uint type; ptr->len = keylen; slotptr(set->page, slot)->off = set->page->min; - bt_unlockpage(BtLockWrite, set->latch); + bt_unlockpage(bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); return 0; } @@ -2262,7 +2280,7 @@ typedef struct { uint entry; // latch table entry number uint slot:31; // page slot number uint reuse:1; // reused previous page -} AtomicMod; +} AtomicTxn; typedef struct { uid page_no; // page number for split leaf @@ -2273,85 +2291,10 @@ typedef struct { unsigned char leafkey[BT_keyarray]; } AtomicKey; -// find and load leaf page for given key -// leave page Atomic locked and Read locked. - -int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) -{ -BtLatchSet *prevlatch; -uid page_no; -uint slot; - - // find level zero page - - if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) - return 0; - - // find next non-dead entry on this page - // it will be the fence key if nothing else - - while( slotptr(set->page, slot)->dead ) - if( slot++ < set->page->cnt ) - continue; - else - return bt->err = BTERR_struct, 0; - - page_no = bt_getid(valptr(set->page, slot)->value); - prevlatch = set->latch; - - while( page_no ) { - if( set->latch = bt_pinlatch (bt, page_no, 1) ) - set->page = bt_mappage (bt, set->latch); - else - return 0; - - if( set->page->free || set->page->lvl ) - return bt->err = BTERR_struct, 0; - - // obtain read lock using lock chaining with Access mode - // release & unpin parent/left sibling page - - bt_lockpage(BtLockAccess, set->latch); - - bt_unlockpage(BtLockRead, prevlatch); - bt_unpinlatch (prevlatch); - - bt_lockpage(BtLockRead, set->latch); - bt_unlockpage(BtLockAccess, set->latch); - - // find key on page at this level - // and descend to requested level - - if( !set->page->kill ) - if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { - bt_unlockpage(BtLockRead, set->latch); - bt_lockpage(BtLockAtomic, set->latch); - bt_lockpage(BtLockAccess, set->latch); - bt_lockpage(BtLockRead, set->latch); - bt_unlockpage(BtLockAccess, set->latch); - - if( slot = bt_findslot (set->page, key, len) ) - return slot; - - bt_unlockpage(BtLockAtomic, set->latch); - } - - // slide right into next page - - page_no = bt_getid(set->page->right); - prevlatch = set->latch; - } - - // return error on end of right chain - - bt->err = BTERR_struct; - return 0; // return error -} - // determine actual page where key is located // return slot number -uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set) +uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set) { BtKey *key = keyptr(source,src); uint slot = locks[src].slot; @@ -2368,15 +2311,18 @@ uint entry; return slot; } - // is locks->reuse set? - // if so, find where our key - // is located on previous page or split pages + // is locks->reuse set? or was slot zeroed? + // if so, find where our key is located + // on current page or pages split on + // same page txn operations. do { set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); if( slot = bt_findslot(set->page, key->key, key->len) ) { + if( slotptr(set->page, slot)->type == Librarian ) + slot++; if( locks[src].reuse ) locks[src].entry = entry; return slot; @@ -2387,17 +2333,17 @@ uint entry; return 0; } -BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); BtVal *val = valptr(source, src); BtLatchSet *latch; BtPageSet set[1]; -uint entry; +uint entry, slot; - while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) { - if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) ) - return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); + while( slot = bt_atomicpage (bt, source, locks, src, set) ) { + if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) ) + return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0); if( entry = bt_splitpage (bt, set) ) latch = bt->mgr->latchsets + entry; @@ -2407,15 +2353,16 @@ uint entry; // splice right page into split chain // and WriteLock it. + bt_lockpage(bt, BtLockWrite, latch); latch->split = set->latch->split; set->latch->split = entry; - bt_lockpage(BtLockWrite, latch); + locks[src].slot = 0; } return bt->err = BTERR_atomic; } -BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src) +BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); uint idx, entry, slot; @@ -2424,29 +2371,108 @@ BtKey *ptr; BtVal *val; if( slot = bt_atomicpage (bt, source, locks, src, set) ) + ptr = keyptr(set->page, slot); + else + return bt->err = BTERR_struct; + + if( !keycmp (ptr, key->key, key->len) ) + if( !slotptr(set->page, slot)->dead ) slotptr(set->page, slot)->dead = 1; + else + return 0; else - return bt->err = BTERR_struct; + return 0; - ptr = keyptr(set->page, slot); val = valptr(set->page, slot); - set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->latch->dirty = 1; set->page->act--; + bt->found++; return 0; } -uint bt_parentmatch (AtomicKey *head, uint entry) +// delete an empty master page for a transaction + +// note that the far right page never empties because +// it always contains (at least) the infinite stopper key +// and that all pages that don't contain any keys are +// deleted, or are being held under Atomic lock. + +BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev) { -uint cnt = 0; +BtPageSet right[1], temp[1]; +unsigned char value[BtId]; +uid right_page_no; +BtKey *ptr; + + bt_lockpage(bt, BtLockWrite, prev->latch); + + // grab the right sibling + + if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) + right->page = bt_mappage (bt, right->latch); + else + return bt->err; + + bt_lockpage(bt, BtLockAtomic, right->latch); + bt_lockpage(bt, BtLockWrite, right->latch); + + // and pull contents over empty page + // while preserving master's left link + + memcpy (right->page->left, prev->page->left, BtId); + memcpy (prev->page, right->page, bt->mgr->page_size); + + // forward seekers to old right sibling + // to new page location in set + + bt_putid (right->page->right, prev->latch->page_no); + right->latch->dirty = 1; + right->page->kill = 1; + + // remove pointer to right page for searchers by + // changing right fence key to point to the master page + + ptr = keyptr(right->page,right->page->cnt); + bt_putid (value, prev->latch->page_no); + + if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + return bt->err; + + // now that master page is in good shape we can + // remove its locks. + + bt_unlockpage (bt, BtLockAtomic, prev->latch); + bt_unlockpage (bt, BtLockWrite, prev->latch); + + // fix master's right sibling's left pointer + // to remove scanner's poiner to the right page + + if( right_page_no = bt_getid (prev->page->right) ) { + if( temp->latch = bt_pinlatch (bt, right_page_no, 1) ) + temp->page = bt_mappage (bt, temp->latch); + + bt_lockpage (bt, BtLockWrite, temp->latch); + bt_putid (temp->page->left, prev->latch->page_no); + temp->latch->dirty = 1; + + bt_unlockpage (bt, BtLockWrite, temp->latch); + bt_unpinlatch (temp->latch); + } else { // master is now the far right page + bt_spinwritelock (bt->mgr->lock); + bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + bt_spinreleasewrite(bt->mgr->lock); + } - if( head ) - do if( head->entry == entry ) - head->nounlock = 1, cnt++; - while( head = head->next ); + // now that there are no pointers to the right page + // we can delete it after the last read access occurs - return cnt; + bt_unlockpage (bt, BtLockWrite, right->latch); + bt_unlockpage (bt, BtLockAtomic, right->latch); + bt_lockpage (bt, BtLockDelete, right->latch); + bt_lockpage (bt, BtLockWrite, right->latch); + bt_freepage (bt, right); + return 0; } // atomic modification of a batch of keys. @@ -2464,7 +2490,7 @@ BtPageSet set[1], prev[1]; unsigned char value[BtId]; BtKey *key, *ptr, *key2; BtLatchSet *latch; -AtomicMod *locks; +AtomicTxn *locks; int result = 0; BtSlot temp[1]; BtPage page; @@ -2472,7 +2498,7 @@ BtVal *val; uid right; int type; - locks = calloc (source->cnt + 1, sizeof(AtomicMod)); + locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); head = NULL; tail = NULL; @@ -2508,11 +2534,11 @@ int type; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); - else // release read on previous page - bt_unlockpage(BtLockRead, set->latch); + else + bt_unlockpage(bt, BtLockRead, set->latch); if( !slot ) - if( slot = bt_atomicload(bt, set, key->key, key->len) ) + if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) ) set->latch->split = 0; else return -1; @@ -2541,13 +2567,13 @@ int type; // return constraint violation if key already exists - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); result = src; while( src ) { if( locks[src].entry ) { set->latch = bt->mgr->latchsets + locks[src].entry; - bt_unlockpage(BtLockAtomic, set->latch); + bt_unlockpage(bt, BtLockAtomic, set->latch); bt_unpinlatch (set->latch); } src--; @@ -2561,8 +2587,8 @@ int type; // unlock last loadpage lock - if( source->cnt > 1 ) - bt_unlockpage(BtLockRead, set->latch); + if( source->cnt ) + bt_unlockpage(bt, BtLockRead, set->latch); // obtain write lock for each master page @@ -2570,7 +2596,7 @@ int type; if( locks[src].reuse ) continue; else - bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry); + bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry); // insert or delete each key // process any splits or merges @@ -2613,6 +2639,7 @@ int type; samepage = src; // pick-up all splits from master page + // each one is already WriteLocked. entry = prev->latch->split; @@ -2621,14 +2648,15 @@ int type; set->page = bt_mappage (bt, set->latch); entry = set->latch->split; - // delete empty master page + // delete empty master page by undoing its split + // (this is potentially another empty page) + // note that there are no new left pointers yet if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, bt->mgr->page_size); - bt_lockpage (BtLockDelete, set->latch); + bt_lockpage (bt, BtLockDelete, set->latch); bt_freepage (bt, set); - bt_unpinlatch (set->latch); prev->latch->dirty = 1; continue; @@ -2639,9 +2667,8 @@ int type; if( !set->page->act ) { memcpy (prev->page->right, set->page->right, BtId); prev->latch->split = set->latch->split; - bt_lockpage (BtLockDelete, set->latch); + bt_lockpage (bt, BtLockDelete, set->latch); bt_freepage (bt, set); - bt_unpinlatch (set->latch); continue; } @@ -2662,9 +2689,11 @@ int type; tail = leaf; + // splice in the left link into the split page + bt_putid (set->page->left, prev->latch->page_no); - bt_lockpage(BtLockParent, prev->latch); - bt_unlockpage(BtLockWrite, prev->latch); + bt_lockpage(bt, BtLockParent, prev->latch); + bt_unlockpage(bt, BtLockWrite, prev->latch); *prev = *set; } @@ -2672,19 +2701,19 @@ int type; // (if all splits were reversed, latch->split == 0) if( latch->split ) { - // fix left pointer in master's original (now split) right sibling - // or set rightmost page in page zero + // fix left pointer in master's original (now split) + // far right sibling or set rightmost page in page zero if( right = bt_getid (prev->page->right) ) { - if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) ) + if( set->latch = bt_pinlatch (bt, right, 1) ) set->page = bt_mappage (bt, set->latch); else return -1; - bt_lockpage (BtLockWrite, set->latch); + bt_lockpage (bt, BtLockWrite, set->latch); bt_putid (set->page->left, prev->latch->page_no); set->latch->dirty = 1; - bt_unlockpage (BtLockWrite, set->latch); + bt_unlockpage (bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); } else { // prev is rightmost page bt_spinwritelock (bt->mgr->lock); @@ -2709,20 +2738,20 @@ int type; tail = leaf; - bt_lockpage(BtLockParent, prev->latch); - bt_unlockpage(BtLockWrite, prev->latch); + bt_lockpage(bt, BtLockParent, prev->latch); + bt_unlockpage(bt, BtLockWrite, prev->latch); // remove atomic lock on master page - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(bt, BtLockAtomic, latch); continue; } // finished if prev page occupied (either master or final split) if( prev->page->act ) { - bt_unlockpage(BtLockWrite, latch); - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(bt, BtLockWrite, latch); + bt_unlockpage(bt, BtLockAtomic, latch); bt_unpinlatch(latch); continue; } @@ -2731,71 +2760,24 @@ int type; // master page located in prev is empty, delete it // by pulling over master's right sibling. - // Delete empty master's fence key + // Remove empty master's fence key ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1); - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; - leaf->type = 1; - - if( tail ) - tail->next = leaf; - else - head = leaf; - - tail = leaf; - - bt_lockpage(BtLockParent, prev->latch); - bt_unlockpage(BtLockWrite, prev->latch); - - // grab master's right sibling - // note that the far right page never empties because - // it always contains (at least) the infinite stopper key - // and that all pages that don't contain any keys have - // been deleted, or are being deleted under write lock. - - if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) - set->page = bt_mappage (bt, set->latch); - else + if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) return -1; - bt_lockpage(BtLockWrite, set->latch); - - // and pull contents over empty page - // while preserving master's left link - - memcpy (set->page->left, prev->page->left, BtId); - memcpy (prev->page, set->page, bt->mgr->page_size); - - // forward seekers to old right sibling - // to new page location in prev + // perform the remainder of the delete + // from the FIFO queue - bt_putid (set->page->right, prev->latch->page_no); - set->latch->dirty = 1; - set->page->kill = 1; - - // add new fence key for new master page contents, delete - // right sibling after parent posts the new master fence. - - ptr = keyptr(set->page,set->page->cnt); leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; - leaf->entry = set->latch->entry; + leaf->entry = prev->latch->entry; + leaf->nounlock = 1; leaf->type = 2; - // see if right sibling key update is in the FIFO, - // and ParentLock if not there. - - if( !bt_parentmatch (head, leaf->entry) ) - bt_lockpage(BtLockParent, set->latch); - - bt_unlockpage(BtLockWrite, set->latch); - if( tail ) tail->next = leaf; else @@ -2803,25 +2785,10 @@ int type; tail = leaf; - // fix new master's right sibling's left pointer + // leave atomic lock in place until + // deletion completes in next phase. - if( right = bt_getid (set->page->right) ) { - if( set->latch = bt_pinlatch (bt, right, 1) ) - set->page = bt_mappage (bt, set->latch); - - bt_lockpage (BtLockWrite, set->latch); - bt_putid (set->page->left, prev->latch->page_no); - set->latch->dirty = 1; - - bt_unlockpage (BtLockWrite, set->latch); - bt_unpinlatch (set->latch); - } else { // master is now the far right page - bt_spinwritelock (bt->mgr->lock); - bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); - bt_spinreleasewrite(bt->mgr->lock); - } - - bt_unlockpage(BtLockAtomic, latch); + bt_unlockpage(bt, BtLockWrite, prev->latch); } // add & delete keys for any pages split or merged during transaction @@ -2847,18 +2814,15 @@ int type; break; - case 2: // insert key & free - if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + case 2: // free page + if( bt_atomicfree (bt, set) ) return -1; - bt_lockpage (BtLockDelete, set->latch); - bt_lockpage (BtLockWrite, set->latch); - bt_freepage (bt, set); break; } if( !leaf->nounlock ) - bt_unlockpage (BtLockParent, set->latch); + bt_unlockpage (bt, BtLockParent, set->latch); bt_unpinlatch (set->latch); tail = leaf->next; @@ -2883,9 +2847,9 @@ BtPageSet set[1]; else return 0; - bt_lockpage(BtLockRead, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); memcpy (bt->cursor, set->page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); bt->cursor_page = page_no; @@ -2916,9 +2880,9 @@ findourself: else return 0; - bt_lockpage(BtLockRead, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); memcpy (bt->cursor, set->page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); next = bt_getid (bt->cursor->right); @@ -2964,11 +2928,11 @@ uid right; else return 0; - bt_lockpage(BtLockRead, set->latch); + bt_lockpage(bt, BtLockRead, set->latch); memcpy (bt->cursor, set->page, bt->mgr->page_size); - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); slot = 0; @@ -2993,7 +2957,7 @@ uint slot; bt->cursor_page = set->latch->page_no; - bt_unlockpage(BtLockRead, set->latch); + bt_unlockpage(bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); return slot; } @@ -3087,7 +3051,7 @@ uint slot = 0; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); memset ((ushort *)latch->parent, 0, sizeof(RWLock)); @@ -3120,9 +3084,9 @@ BtKey *ptr; fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); memset ((ushort *)latch->access, 0, sizeof(RWLock)); - if( *latch->parent->ticket != *latch->parent->serving ) + if( *latch->parent->tid ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->parent, 0, sizeof(RWLock)); + memset ((ushort *)latch->parent, 0, sizeof(WOLock)); if( latch->pin ) { fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); @@ -3277,7 +3241,7 @@ FILE *in; } else if( len < BT_maxkey ) key[len++] = ch; - fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes); + fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found); break; case 'w': @@ -3322,7 +3286,7 @@ FILE *in; set->page = bt_mappage (bt, set->latch); else fprintf(stderr, "unable to obtain latch"), exit(1); - bt_lockpage (BtLockRead, set->latch); + bt_lockpage (bt, BtLockRead, set->latch); next = bt_getid (set->page->right); for( slot = 0; slot++ < set->page->cnt; ) @@ -3341,7 +3305,7 @@ FILE *in; cnt++; } - bt_unlockpage (BtLockRead, set->latch); + bt_unlockpage (bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); } while( page_no = next );