RWLock readwr[1]; // read/write page lock
RWLock access[1]; // Access Intent/Page delete
RWLock parent[1]; // Posting of fence key in parent
- volatile uint next; // next entry in hash table chain
- volatile uint prev; // prev entry in hash table chain
- volatile ushort pin; // number of outstanding latches
+ uint slot; // entry slot in latch table
+ uint next; // next entry in hash table chain
+ uint prev; // prev entry in hash table chain
+ volatile ushort pin; // number of outstanding threads
ushort dirty:1; // page in cache is dirty
} BtLatchSet;
BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
{
-BtPage page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
+BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
BtLatchSet *latch = bt->mgr->latchsets + slot;
if( latch->next = bt->mgr->hashtable[hashidx].slot )
bt->mgr->latchsets[latch->next].prev = slot;
bt->mgr->hashtable[hashidx].slot = slot;
- latch->pin = CLOCK_bit | 1; // initialize clock
latch->page_no = page_no;
+ latch->slot = slot;
latch->prev = 0;
+ latch->pin = 1;
if( loadit )
if( bt->err = bt_readpage (bt->mgr, page, page_no) )
return bt->err = 0;
}
-// release latch pin
+// set CLOCK bit in latch
+// decrement pin count
-void bt_unpinlatch (BtLatchSet *set)
+void bt_unpinlatch (BtLatchSet *latch)
{
#ifdef unix
- __sync_fetch_and_add(&set->pin, -1);
+ if( ~latch->pin & CLOCK_bit )
+ __sync_fetch_and_or(&latch->pin, CLOCK_bit);
+ __sync_fetch_and_add(&latch->pin, -1);
#else
- _InterlockedDecrement16 (&set->pin);
+ if( ~latch->pin & CLOCK_bit )
+ _InterlockedOr16 (&latch->pin, CLOCK_bit);
+ _InterlockedDecrement16 (&latch->pin);
#endif
}
-// map the btree cached page onto current page
+// return the btree cached page address
BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
{
-BtPage page = (BtPage)((uid)(latch - bt->mgr->latchsets) * bt->mgr->page_size + bt->mgr->pagepool);
+BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
return page;
}
latch = bt->mgr->latchsets + slot;
#ifdef unix
__sync_fetch_and_add(&latch->pin, 1);
- __sync_fetch_and_or(&latch->pin, CLOCK_bit);
#else
_InterlockedIncrement16 (&latch->pin);
- _InterlockedOr16 (&latch->pin, CLOCK_bit);
#endif
bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
return latch;
if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
continue;
- if( latch->pin & CLOCK_bit ) {
+ // skip this slot if it is pinned
+ // or the CLOCK bit is set
+
+ if( latch->pin ) {
+ if( latch->pin & CLOCK_bit ) {
#ifdef unix
- __sync_fetch_and_add(&latch->pin, -CLOCK_bit);
+ __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
#else
- _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_bit);
+ _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
#endif
+ }
bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
continue;
}
- // update permanent page area in btree
+ // update permanent page area in btree from buffer pool
- page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
+ page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
if( latch->dirty )
if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
// flush dirty pool pages to the btree
for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
- page = (BtPage)((uid)slot * mgr->page_size + mgr->pagepool);
+ page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
latch = mgr->latchsets + slot;
if( latch->dirty ) {
bt_writepage(mgr, page, latch->page_no);
latch->dirty = 0, num++;
}
+// madvise (page, mgr->page_size, MADV_DONTNEED);
}
fprintf(stderr, "%d buffer pool pages flushed\n", num);
#ifdef unix
- munmap (mgr->hashtable, mgr->nlatchpage * mgr->page_size);
+ munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
munmap (mgr->pagezero, mgr->page_size);
#else
FlushViewOfFile(mgr->pagezero, 0);
// calculate number of latch hash table entries
mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
- mgr->latchhash = mgr->nlatchpage * mgr->page_size / sizeof(BtHashEntry);
+ mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
mgr->nlatchpage += nodemax; // size of the buffer pool in pages
mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
// mlock the pagezero page
flag = PROT_READ | PROT_WRITE;
- mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
+ mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
if( mgr->pagezero == MAP_FAILED ) {
fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
mlock (mgr->pagezero, mgr->page_size);
- mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage * mgr->page_size, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
+ mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
if( mgr->hashtable == MAP_FAILED ) {
fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
#endif
- mgr->pagepool = (unsigned char *)mgr->hashtable + (uid)(mgr->nlatchpage - mgr->latchtotal) * mgr->page_size;
+ mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
return mgr;
while( cnt++ < max ) {
if( cnt == slot )
newslot = idx + 2;
- if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+ if( cnt < max || page->lvl )
+ if( slotptr(bt->frame,cnt)->dead )
continue;
// copy the value across
// make a librarian slot
- if( idx ) {
- slotptr(page, ++idx)->off = nxt;
- slotptr(page, idx)->type = Librarian;
- slotptr(page, idx)->dead = 1;
- }
+ slotptr(page, ++idx)->off = nxt;
+ slotptr(page, idx)->type = Librarian;
+ slotptr(page, idx)->dead = 1;
// set up the slot
idx = 0;
while( cnt++ < max ) {
- if( slotptr(set->page, cnt)->dead && cnt < max )
+ if( cnt < max || set->page->lvl )
+ if( slotptr(set->page, cnt)->dead )
continue;
+
src = valptr(set->page, cnt);
nxt -= src->len + sizeof(BtVal);
memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
// add librarian slot
- if( idx ) {
- slotptr(bt->frame, ++idx)->off = nxt;
- slotptr(bt->frame, idx)->type = Librarian;
- slotptr(bt->frame, idx)->dead = 1;
- }
+ slotptr(bt->frame, ++idx)->off = nxt;
+ slotptr(bt->frame, idx)->type = Librarian;
+ slotptr(bt->frame, idx)->dead = 1;
// add actual slot
// add librarian slot
- if( idx ) {
- slotptr(set->page, ++idx)->off = nxt;
- slotptr(set->page, idx)->type = Librarian;
- slotptr(set->page, idx)->dead = 1;
- }
+ slotptr(set->page, ++idx)->off = nxt;
+ slotptr(set->page, idx)->type = Librarian;
+ slotptr(set->page, idx)->dead = 1;
// add actual slot
// insert new fence for reformulated left block of smaller keys
+ ptr = (BtKey*)fencekey;
bt_putid (value, set->page_no);
- if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
+ if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
return bt->err;
// switch fence for right block of larger keys to new right page
+ ptr = (BtKey*)rightkey;
bt_putid (value, right->page_no);
- if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
+ if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
return bt->err;
bt_unlockpage (BtLockParent, set->latch);