- }
-
- // new update value doesn't fit in existing value area
-
- if( !slotptr(set->page, slot)->dead )
- set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
- else {
- slotptr(set->page, slot)->dead = 0;
- set->page->act++;
- }
-
- if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
- if( !(entry = bt_splitpage (mgr, set, thread_no)) )
- return mgr->err;
- else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
- return mgr->err;
- else
- continue;
-
- set->page->min -= vallen + sizeof(BtVal);
- val = (BtVal*)((unsigned char *)set->page + set->page->min);
- memcpy (val->value, value, vallen);
- val->len = vallen;
-
- set->latch->dirty = 1;
- set->page->min -= keylen + sizeof(BtKey);
- ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
- memcpy (ptr->key, key, keylen);
- ptr->len = keylen;
-
- slotptr(set->page, slot)->off = set->page->min;
- bt_unlockpage(BtLockWrite, set->latch);
- bt_unpinlatch (mgr, set->latch);
- return 0;
- }
- return 0;
-}
-
-typedef struct {
- logseqno reqlsn; // redo log seq no required
- logseqno lsn; // redo log sequence number
- uint entry; // latch table entry number
- uint slot:31; // page slot number
- uint reuse:1; // reused previous page
-} AtomicTxn;
-
-typedef struct {
- uid page_no; // page number for split leaf
- void *next; // next key to insert
- uint entry:29; // latch table entry number
- uint type:2; // 0 == insert, 1 == delete, 2 == free
- uint nounlock:1; // don't unlock ParentModification
- unsigned char leafkey[BT_keyarray];
-} AtomicKey;
-
-// determine actual page where key is located
-// return slot number
-
-uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
-{
-BtKey *key = keyptr(source,src);
-uint slot = locks[src].slot;
-uint entry;
-
- if( src > 1 && locks[src].reuse )
- entry = locks[src-1].entry, slot = 0;
- else
- entry = locks[src].entry;
-
- if( slot ) {
- set->latch = mgr->latchsets + entry;
- set->page = bt_mappage (mgr, set->latch);
- return slot;
- }
-
- // is locks->reuse set? or was slot zeroed?
- // if so, find where our key is located
- // on current page or pages split on
- // same page txn operations.
-
- do {
- set->latch = mgr->latchsets + entry;
- set->page = bt_mappage (mgr, set->latch);
-
- if( slot = bt_findslot(set->page, key->key, key->len) ) {
- if( slotptr(set->page, slot)->type == Librarian )
- slot++;
- if( locks[src].reuse )
- locks[src].entry = entry;
- return slot;
- }
- } while( entry = set->latch->split );
-
- mgr->line = __LINE__, mgr->err = BTERR_atomic;
- return 0;
-}
-
-BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
-{
-BtKey *key = keyptr(source, src);
-BtVal *val = valptr(source, src);
-BtLatchSet *latch;
-BtPageSet set[1];
-uint entry, slot;
-
- while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
- if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
- if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
- return mgr->err;
- set->page->lsn = locks[src].lsn;
- return 0;
- }
-
- if( entry = bt_splitpage (mgr, set, thread_no) )
- latch = mgr->latchsets + entry;
- else
- return mgr->err;
-
- // splice right page into split chain
- // and WriteLock it.
-
- bt_lockpage(BtLockWrite, latch, thread_no);
- latch->split = set->latch->split;
- set->latch->split = entry;
- locks[src].slot = 0;
- }
-
- return mgr->line = __LINE__, mgr->err = BTERR_atomic;
-}
-
-BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
-{
-BtKey *key = keyptr(source, src);
-BtPageSet set[1];
-uint idx, slot;
-BtKey *ptr;
-BtVal *val;
-
- if( slot = bt_atomicpage (mgr, source, locks, src, set) )
- ptr = keyptr(set->page, slot);
- else
- return mgr->line = __LINE__, mgr->err = BTERR_struct;
-
- if( !keycmp (ptr, key->key, key->len) )
- if( !slotptr(set->page, slot)->dead )
- slotptr(set->page, slot)->dead = 1;
- else
- return 0;
- else
- return 0;
-
- val = valptr(set->page, slot);
- set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
- set->latch->dirty = 1;
- set->page->lsn = locks[src].lsn;
- set->page->act--;
- mgr->found++;
- return 0;
-}
-
-// delete an empty master page for a transaction
-
-// note that the far right page never empties because
-// it always contains (at least) the infinite stopper key
-// and that all pages that don't contain any keys are
-// deleted, or are being held under Atomic lock.
-
-BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
-{
-BtPageSet right[1], temp[1];
-unsigned char value[BtId];
-uid right_page_no;
-BtKey *ptr;
-
- bt_lockpage(BtLockWrite, prev->latch, thread_no);
-
- // grab the right sibling
-
- if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
- right->page = bt_mappage (mgr, right->latch);
- else
- return mgr->err;
-
- bt_lockpage(BtLockAtomic, right->latch, thread_no);
- bt_lockpage(BtLockWrite, right->latch, thread_no);
-
- // and pull contents over empty page
- // while preserving master's left link
-
- memcpy (right->page->left, prev->page->left, BtId);
- memcpy (prev->page, right->page, mgr->page_size);
-
- // forward seekers to old right sibling
- // to new page location in set
-
- bt_putid (right->page->right, prev->latch->page_no);
- right->latch->dirty = 1;
- right->page->kill = 1;
-
- // remove pointer to right page for searchers by
- // changing right fence key to point to the master page
-
- ptr = keyptr(right->page,right->page->cnt);
- bt_putid (value, prev->latch->page_no);
-
- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, 1, thread_no) )
- return mgr->err;
-
- // now that master page is in good shape we can
- // remove its locks.
-
- bt_unlockpage (BtLockAtomic, prev->latch);
- bt_unlockpage (BtLockWrite, prev->latch);
-
- // fix master's right sibling's left pointer
- // to remove scanner's poiner to the right page
-
- if( right_page_no = bt_getid (prev->page->right) ) {
- if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
- temp->page = bt_mappage (mgr, temp->latch);
-
- bt_lockpage (BtLockWrite, temp->latch, thread_no);
- bt_putid (temp->page->left, prev->latch->page_no);
- temp->latch->dirty = 1;
-
- bt_unlockpage (BtLockWrite, temp->latch);
- bt_unpinlatch (mgr, temp->latch);
- } else { // master is now the far right page
- bt_mutexlock (mgr->lock);
- bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
- bt_releasemutex(mgr->lock);
- }
-
- // now that there are no pointers to the right page
- // we can delete it after the last read access occurs
-
- bt_unlockpage (BtLockWrite, right->latch);
- bt_unlockpage (BtLockAtomic, right->latch);
- bt_lockpage (BtLockDelete, right->latch, thread_no);
- bt_lockpage (BtLockWrite, right->latch, thread_no);
- bt_freepage (mgr, right);
- return 0;
-}
-
-// find and add the next available latch entry
-// to the queue
-
-BTERR bt_txnavaillatch (BtDb *bt)
-{
-BtLatchSet *latch;
-uint startattempt;
-uint cnt, entry;
-uint hashidx;
-BtPage page;
-
- // find and reuse previous entry on victim
-
- startattempt = bt->mgr->latchvictim;
-
- while( 1 ) {
-#ifdef unix
- entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
-#else
- entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
-#endif
- // skip entry if it has outstanding pins
-
- entry %= bt->mgr->latchtotal;
-
- if( !entry )
- continue;
-
- // only go around one time before
- // flushing redo recovery buffer,
- // and the buffer pool to free up entries.
-
- if( bt->mgr->redopages )
- if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
- if( bt_mutextry (bt->mgr->dump) ) {
- if( bt_dumpredo (bt->mgr) )
- return bt->mgr->err;
- bt_flushlsn (bt->mgr, bt->thread_no);
- // synchronize the various threads running into this condition
- // so that only one thread does the dump and flush
- } else
- bt_mutexlock(bt->mgr->dump);
-
- startattempt = bt->mgr->latchvictim;
- bt_releasemutex(bt->mgr->dump);
- }
-
- latch = bt->mgr->latchsets + entry;
-
- if( latch->avail )
- continue;
-
- bt_mutexlock(latch->modify);
-
- // skip if already an available entry
-
- if( latch->avail ) {
- bt_releasemutex(latch->modify);
- continue;
- }
-
- // skip this entry if it is pinned
- // if the CLOCK bit is set
- // reset it to zero.