uint nlatchpage; // number of latch pages at BT_latch
uint latchtotal; // number of page latch entries
uint latchhash; // number of latch hash table slots
- uint latchvictim; // next latch entry to examine
+ uint latchvictim; // next latch hash entry to examine
} BtLatchMgr;
// latch hash table entries
// latch manager table structure
typedef struct {
+ volatile uid page_no; // latch set page number on disk
BtSpinLatch readwr[1]; // read/write page lock
BtSpinLatch access[1]; // Access Intent/Page delete
BtSpinLatch parent[1]; // Posting of fence key in parent
- BtSpinLatch busy[1]; // slot is being moved between chains
+ volatile ushort dirty; // page is dirty in cache
volatile uint next; // next entry in hash table chain
volatile uint prev; // prev entry in hash table chain
- volatile uint hash; // hash slot entry is under
- volatile ushort dirty; // page is dirty in cache
- volatile ushort pin; // number of outstanding pins
- volatile uid page_no; // latch set page number on disk
+ volatile uint pin; // number of outstanding pins
} BtLatchSet;
// The object structure for Btree access
// link latch table entry into head of latch hash table
-BTERR bt_latchlink (BtDb *bt, uint hashidx, uint victim, uid page_no)
+BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
{
-BtPage page = (BtPage)(victim * bt->page_size + bt->latchpool);
-BtLatchSet *latch = bt->latchsets + victim;
+BtPage page = (BtPage)(slot * bt->page_size + bt->latchpool);
+BtLatchSet *latch = bt->latchsets + slot;
off64_t off = page_no << bt->page_bits;
uint amt[1];
if( latch->next = bt->table[hashidx].slot )
- bt->latchsets[latch->next].prev = victim;
+ bt->latchsets[latch->next].prev = slot;
- bt->table[hashidx].slot = victim;
+ bt->table[hashidx].slot = slot;
latch->page_no = page_no;
- latch->hash = hashidx;
latch->dirty = 0;
latch->prev = 0;
+ latch->pin = 1;
#ifdef unix
- if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) )
+ if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size )
return bt->err = BTERR_read;
#else
SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
#ifdef unix
__sync_fetch_and_add(&latch->pin, -1);
#else
- _InterlockedDecrement16 (&latch->pin);
+ _InterlockedDecrement (&latch->pin);
#endif
}
BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
{
uint hashidx = page_no % bt->latchmgr->latchhash;
-uint slot, victim, idx;
BtLatchSet *latch;
+uint slot, idx;
off64_t off;
uint amt[1];
BtPage page;
break;
} while( slot = latch->next );
- // found our entry
+ // found our entry, bring to front of hash chain
if( slot ) {
latch = bt->latchsets + slot;
#ifdef unix
__sync_fetch_and_add(&latch->pin, 1);
#else
- _InterlockedIncrement16 (&latch->pin);
+ _InterlockedIncrement (&latch->pin);
#endif
+ // unlink our entry from its hash chain position
+
+ if( latch->prev )
+ bt->latchsets[latch->prev].next = latch->next;
+ else
+ bt->table[hashidx].slot = latch->next;
+
+ if( latch->next )
+ bt->latchsets[latch->next].prev = latch->prev;
+
+ // now link into head of the hash chain
+
+ if( latch->next = bt->table[hashidx].slot )
+ bt->latchsets[latch->next].prev = slot;
+
+ bt->table[hashidx].slot = slot;
+ latch->prev = 0;
+
bt_spinreleasewrite(bt->table[hashidx].latch);
return latch;
}
- // see if there are any unused entries
+ // see if there are any unused pool entries
#ifdef unix
- victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
+ slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
#else
- victim = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
+ slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
#endif
- if( victim < bt->latchmgr->latchtotal ) {
- latch = bt->latchsets + victim;
-#ifdef unix
- __sync_fetch_and_add(&latch->pin, 1);
-#else
- _InterlockedIncrement16 (&latch->pin);
-#endif
- bt_latchlink (bt, hashidx, victim, page_no);
+ if( slot < bt->latchmgr->latchtotal ) {
+ latch = bt->latchsets + slot;
+ if( bt_latchlink (bt, hashidx, slot, page_no) )
+ return NULL;
bt_spinreleasewrite (bt->table[hashidx].latch);
return latch;
}
#ifdef unix
- victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
+ __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
#else
- victim = _InterlockedDecrement (&bt->latchmgr->latchdeployed);
+ _InterlockedDecrement (&bt->latchmgr->latchdeployed);
#endif
- // find and reuse previous lock entry
+ // find and reuse previous lru lock entry on victim hash chain
while( 1 ) {
#ifdef unix
- victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
+ idx = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
#else
- victim = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
+ idx = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
#endif
- // we don't use slot zero
-
- if( victim %= bt->latchmgr->latchtotal )
- latch = bt->latchsets + victim;
- else
- continue;
-
- // take control of our slot
- // from other threads
-
- if( latch->pin || !bt_spinwritetry (latch->busy) )
- continue;
-
- idx = latch->hash;
-
// try to get write lock on hash chain
// skip entry if not obtained
// or has outstanding locks
- if( !bt_spinwritetry (bt->table[idx].latch) ) {
- bt_spinreleasewrite (latch->busy);
+ idx %= bt->latchmgr->latchhash;
+
+ if( !bt_spinwritetry (bt->table[idx].latch) )
continue;
- }
- if( latch->pin ) {
- bt_spinreleasewrite (latch->busy);
+ if( slot = bt->table[idx].slot )
+ while( 1 ) {
+ latch = bt->latchsets + slot;
+ if( !latch->next )
+ break;
+ slot = latch->next;
+ }
+
+ if( !slot || latch->pin ) {
bt_spinreleasewrite (bt->table[idx].latch);
continue;
}
// update permanent page area in btree
- page = (BtPage)(victim * bt->page_size + bt->latchpool);
+ page = (BtPage)(slot * bt->page_size + bt->latchpool);
off = latch->page_no << bt->page_bits;
#ifdef unix
if( latch->dirty )
return bt->err = BTERR_wrt, NULL;
}
#endif
- // unlink our available victim from its hash chain
+ // unlink our available slot from its hash chain
if( latch->prev )
bt->latchsets[latch->prev].next = latch->next;
bt->latchsets[latch->next].prev = latch->prev;
bt_spinreleasewrite (bt->table[idx].latch);
-#ifdef unix
- __sync_fetch_and_add(&latch->pin, 1);
-#else
- _InterlockedIncrement16 (&latch->pin);
-#endif
- bt_latchlink (bt, hashidx, victim, page_no);
+ if( bt_latchlink (bt, hashidx, slot, page_no) )
+ return NULL;
bt_spinreleasewrite (bt->table[hashidx].latch);
- bt_spinreleasewrite (latch->busy);
return latch;
}
}
#endif
#ifdef unix
memset (lock, 0, sizeof(lock));
-
- lock->l_type = F_WRLCK;
lock->l_len = sizeof(struct BtPage_);
- lock->l_whence = 0;
+ lock->l_type = F_WRLCK;
if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
return bt_close (bt), NULL;
ovl->OffsetHigh |= 0x80000000;
- if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, len, 0L, ovl) )
+ if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) )
return bt_close (bt), NULL;
#endif
// calculate number of latch hash table entries
- nlatchpage = (nodemax/8 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
+ nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
nlatchpage += nodemax; // size of the buffer pool in pages
// else allocate empty page
if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
- latch = bt_pinlatch (bt, new_page);
- temp = bt_mappage (bt, latch);
+ if( latch = bt_pinlatch (bt, new_page) )
+ temp = bt_mappage (bt, latch);
+ else
+ return 0;
bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
bt_spinreleasewrite(bt->latchmgr->lock);
// determine lock mode of drill level
mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead;
- bt->latch = bt_pinlatch(bt, page_no);
- bt->page_no = page_no;
+ if( bt->latch = bt_pinlatch(bt, page_no) )
+ bt->page_no = page_no;
+ else
+ return 0;
// obtain access lock using lock chaining
break;
child = bt_getid (slotptr(root, idx)->id);
- latch = bt_pinlatch (bt, child);
+ if( latch = bt_pinlatch (bt, child) )
+ temp = bt_mappage (bt, latch);
+ else
+ return bt->err;
bt_lockpage (BtLockDelete, latch);
bt_lockpage (BtLockWrite, latch);
-
- temp = bt_mappage (bt, latch);
memcpy (root, temp, bt->page_size);
if( bt_update (bt, root, bt->latch) )
// obtain lock on right page
- rlatch = bt_pinlatch (bt, right);
- bt_lockpage(BtLockWrite, rlatch);
+ if( rlatch = bt_pinlatch (bt, right) )
+ temp = bt_mappage (bt, rlatch);
+ else
+ return bt->err;
- temp = bt_mappage (bt, rlatch);
+ bt_lockpage(BtLockWrite, rlatch);
if( temp->kill ) {
bt_abort(bt, temp, right, 0);
// lock right page
- rlatch = bt_pinlatch (bt, right);
- bt_lockpage (BtLockParent, rlatch);
+ if( rlatch = bt_pinlatch (bt, right) )
+ bt_lockpage (BtLockParent, rlatch);
+ else
+ return bt->err;
// update left (containing) node
break;
bt->cursor_page = right;
- latch = bt_pinlatch (bt, right);
- bt_lockpage(BtLockRead, latch);
+
+ if( latch = bt_pinlatch (bt, right) )
+ bt_lockpage(BtLockRead, latch);
+ else
+ return 0;
bt->page = bt_mappage (bt, latch);
memcpy (bt->cursor, bt->page, bt->page_size);
if( idx = bt->table[hashidx].slot ) do {
latch = bt->latchsets + idx;
- if( *(ushort *)latch->busy )
- fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
- *(ushort *)latch->busy = 0;
- if( latch->hash != hashidx )
- fprintf(stderr, "latchset %d wrong hashidx\n", idx);
if( latch->pin )
fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
} while( idx = latch->next );
break;
case 's':
- scan++;
+ fprintf(stderr, "started scaning\n");
+ cnt = len = key[0] = 0;
+
+ if( slot = bt_startkey (bt, key, len) )
+ slot--;
+ else
+ fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
+
+ while( slot = bt_nextkey (bt, slot) ) {
+ ptr = bt_key(bt, slot);
+ fwrite (ptr->key, ptr->len, 1, stdout);
+ fputc ('\n', stdout);
+ cnt++;
+ }
+
+ fprintf(stderr, " Total keys read %d\n", cnt - 1);
+ break;
case 'c':
fprintf(stderr, "started counting\n");
page_no = LEAF_page;
while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
- uid off = page_no << bt->page_bits;
-#ifdef unix
- pread (bt->idx, bt->frame, bt->page_size, off);
-#else
- DWORD amt[1];
-
- SetFilePointer (bt->idx, (long)off, NULL, FILE_BEGIN);
-
- if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
- fprintf (stderr, "unable to read page %.8x", page_no);
-
- if( *amt < bt->page_size )
- fprintf (stderr, "unable to read page %.8x", page_no);
-#endif
- if( !bt->frame->free && !bt->frame->lvl )
- cnt += bt->frame->act;
+ BtLatchSet *latch;
+ BtPage page;
+ if( latch = bt_pinlatch (bt, page_no) )
+ page = bt_mappage (bt, latch);
+ if( !page->free && !page->lvl )
+ cnt += page->act;
if( page_no > LEAF_page )
next = page_no + 1;
+ if( scan )
+ for( idx = 0; idx++ < page->cnt; ) {
+ if( slotptr(page, idx)->dead )
+ continue;
+ ptr = keyptr(page, idx);
+ if( idx != page->cnt && bt_getid (page->right) ) {
+ fwrite (ptr->key, ptr->len, 1, stdout);
+ fputc ('\n', stdout);
+ }
+ }
+ bt_unpinlatch (latch);
page_no = next;
}