} BtLock;
typedef struct {
- volatile uint value[1];
-} BtMutexLatch;
+ union {
+ struct {
+ volatile unsigned char xcl[1];
+ volatile unsigned char filler;
+ volatile ushort waiters[1];
+ } bits[1];
+ uint value[1];
+ };
+} MutexLatch;
// definition for reader/writer reentrant lock implementation
typedef struct {
- BtMutexLatch xcl[1];
- BtMutexLatch wrt[1];
+ MutexLatch xcl[1];
+ MutexLatch wrt[1];
uint readers;
ushort dup; // re-entrant locks
ushort tid; // owner thread-no
// hash table entries
typedef struct {
- BtMutexLatch latch[1];
+ MutexLatch latch[1];
uint entry; // Latch table entry at head of chain
} BtHashEntry;
typedef struct {
uid page_no; // latch set page number
- BtMutexLatch modify[1]; // modify entry lite latch
+ MutexLatch modify[1]; // modify entry lite latch
RWLock readwr[1]; // read/write page lock
RWLock access[1]; // Access Intent/Page delete
RWLock parent[1]; // Posting of fence key in parent
uint split; // right split page atomic insert
uint next; // next entry in hash table chain
uint prev; // prev entry in hash table chain
- ushort pin; // number of accessing threads
+ volatile ushort pin; // number of accessing threads
unsigned char dirty; // page in cache is dirty
unsigned char leaf; // page in cache is a leaf
} BtLatchSet;
typedef enum {
Unique,
+ Update,
Librarian,
Duplicate,
Delete
unsigned char *leafpool; // mapped to the leaf pool pages
unsigned char *redobuff; // mapped recovery buffer pointer
logseqno lsn, flushlsn; // current & first lsn flushed
- BtMutexLatch redo[1]; // redo area lite latch
- BtMutexLatch lock[1]; // allocation area lite latch
- BtMutexLatch maps[1]; // mapping segments lite latch
+ MutexLatch redo[1]; // redo area lite latch
+ MutexLatch lock[1]; // allocation area lite latch
+ MutexLatch maps[1]; // mapping segments lite latch
ushort thread_no[1]; // highest thread number issued
ushort err_thread; // error thread number
uint nlatchpage; // size of buffer pool & latchsets
HANDLE halloc; // allocation handle
HANDLE hpool; // buffer pool handle
#endif
- uint segments; // number of memory mapped segments
+ volatile uint segments; // number of memory mapped segments
unsigned char *pages[64000];// memory mapped segments of b-tree
} BtMgr;
{
return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
}
-/*
-void bt_mutexlock(BtMutexLatch *latch)
-{
-uint idx = 200;
-
- while( __sync_val_compare_and_swap (latch->value, 0, 1) )
- if( !idx )
- sched_yield ();
- else
- idx--;
-}
-int bt_mutextry(BtMutexLatch *latch)
-{
- return !__sync_val_compare_and_swap (latch->value, 0, 1);
-}
-
-void bt_releasemutex(BtMutexLatch *latch)
-{
- if( !__sync_lock_test_and_set (latch->value, 0) )
- abort();
-}
-*/
-void bt_mutexlock(BtMutexLatch *latch)
+void bt_mutexlock(MutexLatch *latch)
{
-uint idx;
+uint idx, waited = 0;
+MutexLatch prev[1];
- for( idx = 0; idx < 100; idx++ )
- if( !__sync_val_compare_and_swap (latch->value, 0, 1) )
+ while( 1 ) {
+ for( idx = 0; idx < 100; idx++ ) {
+ *prev->value = __sync_fetch_and_or (latch->value, 1);
+ if( !*prev->bits->xcl ) {
+ if( waited )
+ __sync_fetch_and_sub (latch->bits->waiters, 1);
return;
+ }
+ }
- while( __sync_lock_test_and_set (latch->value, 2) )
- sys_futex ((uint *)latch->value, FUTEX_WAIT_PRIVATE, 2, NULL, NULL, 0);
+ if( !waited ) {
+ __sync_fetch_and_add (latch->bits->waiters, 1);
+ *prev->bits->waiters += 1;
+ waited++;
+ }
+
+ sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
+ }
}
-int bt_mutextry(BtMutexLatch *latch)
+int bt_mutextry(MutexLatch *latch)
{
- return !__sync_val_compare_and_swap (latch->value, 0, 1);
+ return !__sync_lock_test_and_set (latch->bits->xcl, 1);
}
-void bt_releasemutex(BtMutexLatch *latch)
+void bt_releasemutex(MutexLatch *latch)
{
-uint idx;
-
- if( *latch->value == 2 )
- *latch->value = 0;
- else if( __sync_lock_test_and_set (latch->value, 0) == 1 )
- return;
+ *latch->bits->xcl = 0;
- if( latch->value[0] )
- if( __sync_val_compare_and_swap (latch->value, 1, 2) )
- return;
-
-// for( idx = 0; idx < 200; idx++ )
-// if( latch->value[0] )
-// if( __sync_val_compare_and_swap (latch->value, 1, 2) )
-// return;
-
- sys_futex( (uint *)latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
+ if( *latch->bits->waiters )
+ sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
}
// reader/writer lock implementation
bt_mutexlock (lock->xcl);
bt_mutexlock (lock->wrt);
bt_releasemutex (lock->xcl);
-//if( lock->tid )
-//abort();
+
lock->line = line;
lock->tid = tid;
}
segment++, fragment = 0;
if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + (fragment << mgr->page_bits);
+ perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
memcpy ((unsigned char *)page + off, perm, mgr->page_size);
off += mgr->page_size;
bt_releasemutex (mgr->maps);
}
-//if( !leaf && !page->lvl )
-//abort();
-//if( leaf && page->lvl )
-//abort();
return 0;
}
uint off = 0, segment, fragment;
unsigned char *perm;
-//if( !leaf && !page->lvl )
-//abort();
-//if( leaf && page->lvl )
-//abort();
-//if( !page->lvl && mgr->leaf_xtra == 8 )
-//fprintf(stderr, "WrPage %d\n", (uint)page_no);
if( leaf )
page_size <<= mgr->leaf_xtra;
segment++, fragment = 0;
if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + (fragment << mgr->page_bits);
+ perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
memcpy (perm, (unsigned char *)page + off, mgr->page_size);
off += mgr->page_size;
fragment++;
// set CLOCK bit in latch
// decrement pin count
+int value;
+
void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
{
bt_mutexlock(latch->modify);
-//if( !(latch->pin & ~CLOCK_bit) )
-//abort();
latch->pin |= CLOCK_bit;
latch->pin--;
uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
BtPage page;
- if( latch->leaf )
- page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
- else
- page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
-//if( latch->leaf )
-//if( page->lvl )
-//abort();
- return page;
+ if( latch->leaf )
+ page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
+ else
+ page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
+
+ return page;
}
// return next available leaf entry
bt_mutexlock(mgr->leaftable[hashidx].latch);
- if( entry = mgr->leaftable[hashidx].entry ) do
- {
+ if( entry = mgr->leaftable[hashidx].entry )
+ do {
latch = mgr->leafsets + entry;
+
if( page_no == latch->page_no )
break;
- } while( entry = latch->next );
+ } while( entry = latch->next );
// found our entry: increment pin
bt_mutexlock(latch->modify);
latch->pin |= CLOCK_bit;
latch->pin++;
-//if( !latch->leaf )
-//abort();
+
bt_releasemutex(latch->modify);
bt_releasemutex(mgr->leaftable[hashidx].latch);
return latch;
// find and reuse unpinned entry
trynext:
-
entry = bt_availleaf (mgr);
latch = mgr->leafsets + entry;
-//if( latch->page_no )
-//if( !latch->leaf )
-//abort();
idx = latch->page_no % mgr->leafhash;
// update permanent page area in btree from buffer pool
// no read-lock is required since page is not pinned.
-//if( latch->page_no )
-//if( !latch->leaf )
-//abort();
if( latch->dirty )
if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
if( entry = mgr->hashtable[hashidx].entry ) do
{
latch = mgr->latchsets + entry;
+
if( page_no == latch->page_no )
break;
} while( entry = latch->next );
// find and reuse unpinned entry
trynext:
-
entry = bt_availnext (mgr);
latch = mgr->latchsets + entry;
// update permanent page area in btree from buffer pool
// no read-lock is required since page is not pinned.
-//if( latch->page_no )
-//if( latch->leaf )
-//abort();
if( latch->dirty )
if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
bt_lockpage(mode, set->latch, thread_no, __LINE__);
+ // grab our fence key
+
+ ptr=keyptr(set->page,set->page->cnt);
+
if( set->page->free )
return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
prevpage = set->page;
prevmode = mode;
+ // if requested key is beyond our fence,
+ // slide to the right
+
+ if( keycmp (ptr, key, len) < 0 )
+ if( page_no = bt_getid(set->page->right) )
+ continue;
+
+ // if page is part of a delete operation,
+ // slide to the left;
+
+ if( set->page->kill ) {
+ bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
+ page_no = bt_getid(set->page->left);
+ bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
+ continue;
+ }
+
// find key on page at this level
// and descend to requested level
- if( !set->page->kill )
- if( slot = bt_findslot (set->page, key, len) ) {
+ if( slot = bt_findslot (set->page, key, len) ) {
if( drill == lvl )
return slot;
val = valptr(set->page, slot);
if( val->len == BtId )
- page_no = bt_getid(valptr(set->page, slot)->value);
+ page_no = bt_getid(val->value);
else
return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
// slide right into next page
- bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
page_no = bt_getid(set->page->right);
- bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
} while( page_no );
{
unsigned char *freechain;
-//if( (set->latch->pin & ~CLOCK_bit) > 1 )
-//abort();
if( set->page->lvl )
freechain = mgr->pagezero->freechain;
else {
// returns with page unpinned
// from the page pool.
-BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
+BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
{
-unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
+unsigned char lowerfence[BT_keyarray];
uint page_size = mgr->page_size;
+BtPageSet right[1], temp[1];
unsigned char value[BtId];
-uint lvl = set->page->lvl;
-BtPageSet right[1];
-uid page_no;
+uid page_no, right2;
BtKey *ptr;
if( !lvl )
page_size <<= mgr->leaf_xtra;
- // cache copy of fence key
- // to remove in parent
+ // cache copy of original fence key
+ // that is being deleted.
ptr = keyptr(set->page, set->page->cnt);
memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
right->page = bt_mappage (mgr, right->latch);
else
return 0;
-//if( right->page->lvl )
-//abort();
- bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
-
- // cache copy of key to update
- ptr = keyptr(right->page, right->page->cnt);
- memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
+ bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
if( right->page->kill )
return mgr->line = __LINE__, mgr->err = BTERR_struct;
// pull contents of right peer into our empty page
+ // preserving our left page number.
- memcpy (right->page->left, set->page->left, BtId);
+ bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
+ page_no = bt_getid(set->page->left);
memcpy (set->page, right->page, page_size);
+ bt_putid (set->page->left, page_no);
+ bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
+
set->page->page_no = set->latch->page_no;
set->latch->dirty = 1;
- // mark right page deleted and point it to left page
- // until we can post parent updates that remove access
- // to the deleted page.
+ if( right2 = bt_getid (right->page->right) ) {
+ if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
+ temp->page = bt_mappage (mgr, temp->latch);
+ else
+ return 0;
- bt_putid (right->page->right, set->latch->page_no);
- right->latch->dirty = 1;
- right->page->kill = 1;
+ bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+ bt_putid (temp->page->left, set->latch->page_no);
+ bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+ bt_unpinlatch (temp->latch, thread_no, __LINE__);
+ } else { // page is rightmost
+ bt_mutexlock (mgr->lock);
+ bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
+ bt_releasemutex(mgr->lock);
+ }
- bt_lockpage (BtLockParent, right->latch, thread_no, __LINE__);
- bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
+ // mark right page deleted and release lock
- bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
- bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
+ right->latch->dirty = 1;
+ right->page->kill = 1;
// redirect higher key directly to our new node contents
+ ptr = keyptr(right->page, right->page->cnt);
bt_putid (value, set->latch->page_no);
- ptr = (BtKey*)higherfence;
- if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
+ if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
return mgr->err;
- // delete old lower key to our node
+ bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
+
+ // delete orignal fence key in parent
+ // and unlock our page.
- ptr = (BtKey*)lowerfence;
+ ptr = (BtKey *)lowerfence;
if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
return mgr->err;
+ bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, thread_no, __LINE__);
+
// obtain delete and write locks to right node
- bt_unlockpage (BtLockParent, right->latch, thread_no, __LINE__);
bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
-while( (right->latch->pin & ~CLOCK_bit) > 1 )
-sched_yield();
bt_freepage (mgr, right, thread_no);
-//fprintf(stderr, "DelPage %d by %d at %d\n", (uint)right->latch->page_no, thread_no, __LINE__);
- bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
return 0;
}
// delete empty page
if( !set->page->act )
- return bt_deletepage (mgr, set, thread_no);
+ return bt_deletepage (mgr, set, thread_no, set->page->lvl);
set->latch->dirty = 1;
bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
// return pool entry for new right
// page, pinned & unlocked
-uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
+uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
{
uint page_size = mgr->page_size;
+BtPageSet right[1], temp[1];
uint cnt = 0, idx = 0, max;
uint lvl = set->page->lvl;
-BtPageSet right[1];
BtKey *key, *ptr;
BtVal *val, *src;
BtPage frame;
uid right2;
+uint entry;
uint prev;
if( !set->page->lvl )
// link right node
- if( set->latch->page_no > ROOT_page )
- bt_putid (frame->right, bt_getid (set->page->right));
+ if( set->latch->page_no > ROOT_page ) {
+ right2 = bt_getid (set->page->right);
+ bt_putid (frame->right, right2);
+ bt_putid (frame->left, set->latch->page_no);
+ }
// get new free page and write higher keys to it.
if( bt_newpage(mgr, right, frame, thread_no) )
return 0;
+ // link far right's left pointer to new page
+
+ if( linkleft && set->latch->page_no > ROOT_page )
+ if( right2 ) {
+ if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
+ temp->page = bt_mappage (mgr, temp->latch);
+ else
+ return 0;
+
+ bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+ bt_putid (temp->page->left, right->latch->page_no);
+ bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
+ bt_unpinlatch (temp->latch, thread_no, __LINE__);
+ } else { // page is rightmost
+ bt_mutexlock (mgr->lock);
+ bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
+ bt_releasemutex(mgr->lock);
+ }
+
// process lower keys
memcpy (frame, set->page, page_size);
cnt = 0;
idx = 0;
- if( slotptr(frame, max)->type == Librarian )
- max--;
-
// assemble page of smaller keys
while( cnt++ < max ) {
set->page->cnt = idx;
free(frame);
- return right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
+ entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
+ return entry;
}
// fix keys for newly split page
BtPage page;
BtKey *ptr;
-//if( !lvl )
-//abort();
// if current page is the root page, split it
if( set->latch->page_no == ROOT_page )
BtVal *val;
int rate;
- // if found slot > desired slot and previous slot
- // is a librarian slot, use it
+ // if previous slot is a librarian slot, use it
if( slot > 1 )
if( slotptr(set->page, slot-1)->type == Librarian )
// if librarian slot == found slot, advance to real slot
- if( node->type == Librarian )
- if( !keycmp (ptr, key, keylen) ) {
+ if( node->type == Librarian ) {
+// if( !keycmp (ptr, key, keylen) ) {
ptr = keyptr(set->page, ++slot);
node = slotptr(set->page, slot);
}
switch( type ) {
case Unique:
case Duplicate:
- if( keycmp (ptr, key, keylen) )
- if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) {
+ if( !keycmp (ptr, key, keylen) )
+ break;
+
+ if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
return mgr->err;
+ else
+ goto insxit;
- bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
- return 0;
- } else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
- return mgr->err_thread = thread_no, mgr->err;
- else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
- return mgr->err_thread = thread_no, mgr->err;
- else
- continue;
-
- // if key already exists, update value and return
+ if( entry = bt_splitpage (mgr, set, thread_no, 1) )
+ if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+ continue;
- val = valptr(set->page, slot);
+ return mgr->err_thread = thread_no, mgr->err;
- if( val->len >= vallen ) {
- if( slotptr(set->page, slot)->dead )
- set->page->act++;
- node->type = type;
- node->dead = 0;
+ case Update:
+ if( keycmp (ptr, key, keylen) )
+ goto insxit;
- set->page->garbage += val->len - vallen;
- set->latch->dirty = 1;
- val->len = vallen;
- memcpy (val->value, value, vallen);
- bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
- return 0;
- }
+ break;
+ }
- // new update value doesn't fit in existing value area
- // make sure page has room
+ // if key already exists, update value and return
- if( !node->dead )
- set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
- else
- set->page->act++;
+ val = valptr(set->page, slot);
+ if( val->len >= vallen ) {
+ if( node->dead )
+ set->page->act++;
node->type = type;
node->dead = 0;
- if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
- if( !(entry = bt_splitpage (mgr, set, thread_no)) )
- return mgr->err_thread = thread_no, mgr->err;
- else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
- return mgr->err_thread = thread_no, mgr->err;
- else
+ set->page->garbage += val->len - vallen;
+ set->latch->dirty = 1;
+ val->len = vallen;
+ memcpy (val->value, value, vallen);
+ goto insxit;
+ }
+
+ // new update value doesn't fit in existing value area
+ // make sure page has room
+
+ if( !node->dead )
+ set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
+ else
+ set->page->act++;
+
+ node->type = type;
+ node->dead = 0;
+
+ if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
+ break;
+
+ if( entry = bt_splitpage (mgr, set, thread_no, 1) )
+ if( !bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
continue;
- // copy key and value onto page and update slot
+ return mgr->err_thread = thread_no, mgr->err;
+ }
+
+ // copy key and value onto page and update slot
- set->page->min -= vallen + sizeof(BtVal);
- val = (BtVal*)((unsigned char *)set->page + set->page->min);
- memcpy (val->value, value, vallen);
- val->len = vallen;
+ set->page->min -= vallen + sizeof(BtVal);
+ val = (BtVal*)((unsigned char *)set->page + set->page->min);
+ memcpy (val->value, value, vallen);
+ val->len = vallen;
- set->latch->dirty = 1;
- set->page->min -= keylen + sizeof(BtKey);
- ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
- memcpy (ptr->key, key, keylen);
- ptr->len = keylen;
+ set->latch->dirty = 1;
+ set->page->min -= keylen + sizeof(BtKey);
+ ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+ memcpy (ptr->key, key, keylen);
+ ptr->len = keylen;
- node->off = set->page->min;
- bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
- bt_unpinlatch (set->latch, thread_no, __LINE__);
- return 0;
- }
- }
+ slotptr(set->page,slot)->off = set->page->min;
+
+insxit:
+ bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
+ bt_unpinlatch (set->latch, thread_no, __LINE__);
return 0;
}
// split page
- if( entry = bt_splitpage (mgr, set, thread_no) )
+ if( entry = bt_splitpage (mgr, set, thread_no, 0) )
latch = mgr->leafsets + entry;
else
return mgr->err_thread = thread_no, mgr->err;
// splice right page into split chain
// and WriteLock it
-//fprintf(stderr, "SplitPg %d by %d at %d\n", (uint)latch->page_no, thread_no, __LINE__);
bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
latch->split = set->latch->split;
set->latch->split = entry;
// promote page into larger btree
if( bt->main )
- while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->thread_no )
+ while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
if( bt_txnpromote (bt) )
return bt->mgr->err;
BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
{
-unsigned char fencekey[BT_keyarray], prvfence[BT_keyarray];
uint src, idx, samepage, entry;
BtPageSet set[1], prev[1];
unsigned char value[BtId];
BtLatchSet *latch;
uid right_page_no;
AtomicTxn *locks;
-BtKey *key, *ptr, *prv, *cur;
+BtKey *key, *ptr;
BtPage page;
BtVal *val;
-uint slot, prvslot;
-
locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
// Load the leaf page for each key
samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
if( !samepage )
- if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
-//memcpy (prvfence, fencekey, 10),
-//src>1 ? memcpy (fencekey, ptr->key, 10):NULL,
-//prv = ptr,
-//cur = keyptr(set->page, slot),
+ if( bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
else
return mgr->err;
entry = set->latch - mgr->leafsets;
-
- // is this actually on the same page?
-
-//if( !samepage )
-// for( idx = src; --idx; )
-//if( entry == locks[idx].entry ) {
-//fprintf(stderr, "Dup page %d by thread %d\n", (uint)set->latch->page_no, thread_no);
-// abort();
-// }
-
locks[src].reuse = samepage;
locks[src].entry = entry;
ptr = keyptr(prev->page,prev->page->cnt);
bt_putid (value, prev->latch->page_no);
-//fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
+
if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
return mgr->err;
ptr = keyptr(prev->page,prev->page->cnt);
bt_putid (value, prev->latch->page_no);
-//fprintf(stderr, "KeyIns for %d by %d at %d\n", (uint)prev->latch->page_no, thread_no, __LINE__);
- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
return mgr->err;
if( lsm )
// any and all splits were reversed, and the
// master page located in prev is empty, delete it
- if( bt_deletepage (mgr, prev, thread_no) )
+ if( bt_deletepage (mgr, prev, thread_no, 0) )
return mgr->err;
}
set->latch = bt->mgr->leafsets + entry;
+ // skip this entry if it has never been used
+
+ if( !set->latch->page_no )
+ continue;
+
if( !bt_mutextry(set->latch->modify) )
continue;
set->page = bt_mappage (bt->mgr, set->latch);
- // entry never used or has no left or right sibling
+ // entry has no right sibling
- if( !set->latch->page_no || !bt_getid (set->page->right) ) {
+ if( !bt_getid (set->page->right) ) {
bt_releasemutex(set->latch->modify);
continue;
}
// now delete the page
- if( bt_deletepage (bt->mgr, set, bt->thread_no) )
+ if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
return bt->mgr->err;
}
#endif
-void bt_poolaudit (BtMgr *mgr)
+void bt_poolaudit (BtMgr *mgr, char *type)
{
-BtLatchSet *latch;
-uint entry = 0;
+BtLatchSet *latch, test[1];
+uint entry;
+
+ memset (test, 0, sizeof(test));
+
+ if( memcmp (test, mgr->latchsets, sizeof(test)) )
+ fprintf(stderr, "%s latchset zero overwritten\n", type);
+
+ if( memcmp (test, mgr->leafsets, sizeof(test)) )
+ fprintf(stderr, "%s leafset zero overwritten\n", type);
- while( ++entry < mgr->latchtotal ) {
+ for( entry = 0; ++entry < mgr->latchtotal; ) {
latch = mgr->latchsets + entry;
- if( *latch->readwr->wrt->value )
- fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
+ if( *latch->modify->value )
+ fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
-// if( *latch->access->bits->tid )
-// fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
+ if( latch->pin & ~CLOCK_bit )
+ fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
+ }
-// if( *latch->parent->bits->tid )
-// fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
+ for( entry = 0; ++entry < mgr->leaftotal; ) {
+ latch = mgr->leafsets + entry;
if( *latch->modify->value )
- fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
+ fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
if( latch->pin & ~CLOCK_bit )
- fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
+ fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
}
}
for( idx = 0; idx < cnt; idx++ )
CloseHandle(threads[idx]);
#endif
- bt_poolaudit(mgr);
+ bt_poolaudit(mgr, "cache");
if( main )
- bt_poolaudit(main);
+ bt_poolaudit(main, "main");
- fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+ fprintf(stderr, "cache %d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+ if( main )
+ fprintf(stderr, "main %d reads %d writes %d found\n", main->reads, main->writes, main->found);
if( main )
bt_mgrclose (main);