WOLock parent[1]; // Posting of fence key in parent
WOLock atomic[1]; // Atomic update in progress
uint split; // right split page atomic insert
- uint entry; // entry slot in latch table
uint next; // next entry in hash table chain
uint prev; // prev entry in hash table chain
ushort pin; // number of accessing threads
// note that this structure size
// must be a multiple of 8 bytes
-// in order to place dups correctly.
+// in order to place PageZero correctly.
typedef struct BtPage_ {
uint cnt; // count of keys in page
typedef struct {
struct BtPage_ alloc[1]; // next page_no in right ptr
- unsigned long long dups[1]; // global duplicate key uniqueifier
unsigned char freechain[BtId]; // head of free page_nos chain
- unsigned long long activepages; // number of active pages pages
+ unsigned long long activepages; // number of active pages
+ uint redopages; // number of redo pages in file
} BtPageZero;
// The object structure for Btree access
uint latchhash; // number of latch hash table slots
uint latchvictim; // next latch entry to examine
uint latchpromote; // next latch entry to promote
- uint redopages; // size of recovery buff in pages
uint redolast; // last msync size of recovery buff
uint redoend; // eof/end element in recovery buff
int err; // last error
extern void bt_close (BtDb *bt);
extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
-extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
return id;
}
-uid bt_newdup (BtMgr *mgr)
-{
-#ifdef unix
- return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
-#else
- return _InterlockedIncrement64(mgr->pagezero->dups, 1);
-#endif
-}
-
// lite weight spin lock Latch Manager
int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
void bt_flushlsn (BtMgr *mgr, ushort thread_no)
{
uint cnt3 = 0, cnt2 = 0, cnt = 0;
+uint entry, segment;
BtLatchSet *latch;
BtPage page;
-uint entry;
// flush dirty pool pages to the btree
bt_lockpage(BtLockRead, latch, thread_no);
if( latch->dirty ) {
- bt_writepage(mgr, page, latch->page_no, 0);
+ bt_writepage(mgr, page, latch->page_no);
latch->dirty = 0, cnt++;
}
if( latch->pin & ~CLOCK_bit )
}
fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
fprintf(stderr, "begin sync");
- sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
+ for( segment = 0; segment < mgr->segments; segment++ )
+ if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
fprintf(stderr, " end sync\n");
}
BtKey *key;
BtVal *val;
- pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
-
hdr = (BtLogHdr *)mgr->redobuff;
mgr->flushlsn = hdr->lsn;
}
// recovery manager -- append new entry to recovery log
-// flush to disk when it overflows.
+// flush dirty pages to disk when it overflows.
logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
{
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
uint amt = sizeof(BtLogHdr);
BtLogHdr *hdr, *eof;
uint last, end;
if( amt > size - mgr->redoend ) {
mgr->flushlsn = mgr->lsn;
- msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+ if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
mgr->redolast = 0;
mgr->redoend = 0;
bt_flushlsn(mgr, thread_no);
memset (eof, 0, sizeof(BtLogHdr));
eof->lsn = mgr->lsn;
- last = mgr->redolast & 0xfff;
+ last = mgr->redolast & ~0xfff;
end = mgr->redoend;
- mgr->redolast = end;
- bt_releasemutex(mgr->redo);
+ if( end - last + sizeof(BtLogHdr) >= 8192 )
+ if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+ else
+ mgr->redolast = end;
- msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+ bt_releasemutex(mgr->redo);
return hdr->lsn;
}
// recovery manager -- append transaction to recovery log
-// flush to disk when it overflows.
+// flush dirty pages to disk when it overflows.
logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
{
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
uint amt = 0, src, type;
BtLogHdr *hdr, *eof;
uint last, end;
if( amt > size - mgr->redoend ) {
mgr->flushlsn = mgr->lsn;
- msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+ if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
mgr->redolast = 0;
mgr->redoend = 0;
bt_flushlsn (mgr, thread_no);
memset (eof, 0, sizeof(BtLogHdr));
eof->lsn = lsn;
- last = mgr->redolast & 0xfff;
+ last = mgr->redolast & ~0xfff;
end = mgr->redoend;
- mgr->redolast = end;
- bt_releasemutex(mgr->redo);
- msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+ if( end - last + sizeof(BtLogHdr) >= 8192 )
+ if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+ else
+ mgr->redolast = end;
+
bt_releasemutex(mgr->redo);
return lsn;
}
BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
{
int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
while( 1 ) {
if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
- memcpy (page, perm, mgr->page_size);
-if( page->page_no != page_no )
+ perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( perm->page_no != page_no )
abort();
+ memcpy (page, perm, mgr->page_size);
mgr->reads++;
return 0;
}
// write page to permanent location in Btree file
// clear the dirty bit
-BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
+BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
{
int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
while( 1 ) {
if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+ perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( page_no > LEAF_page && perm->page_no != page_no)
+abort();
memcpy (perm, page, mgr->page_size);
- if( syncit )
- msync (perm, mgr->page_size, MS_SYNC);
mgr->writes++;
return 0;
}
BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
{
-BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
+uid entry = latch - mgr->latchsets;
+BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
return page;
}
// no read-lock is required since page is not pinned.
if( latch->dirty )
- if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
+ if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
return mgr->line = __LINE__, NULL;
else
latch->dirty = 0;
memcpy (page, contents, mgr->page_size);
latch->dirty = 1;
} else if( bt_readpage (mgr, page, page_no) )
- return mgr->line = __LINE__, NULL;
+ return mgr->line = __LINE__, NULL;
// link page as head of hash table chain
// if this is a never before used entry,
latch->pin = CLOCK_bit | 1;
latch->page_no = page_no;
- latch->entry = entry;
latch->split = 0;
bt_releasemutex (latch->modify);
if( mgr->redoend ) {
eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
memset (eof, 0, sizeof(BtLogHdr));
-
- pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
}
// write remaining dirty pool pages to the btree
latch = mgr->latchsets + slot;
if( latch->dirty ) {
- bt_writepage(mgr, page, latch->page_no, 0);
+ bt_writepage(mgr, page, latch->page_no);
latch->dirty = 0, num++;
}
}
- // flush last batch to disk and clear
- // redo recovery buffer on disk.
+ // clear redo recovery buffer on disk.
- fdatasync (mgr->idx);
-
- if( mgr->redopages ) {
+ if( mgr->pagezero->redopages ) {
eof = (BtLogHdr *)mgr->redobuff;
memset (eof, 0, sizeof(BtLogHdr));
eof->lsn = mgr->lsn;
-
- pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
-
- sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+ if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
}
fprintf(stderr, "%d buffer pool pages flushed\n", num);
CloseHandle(mgr->hpool);
#endif
#ifdef unix
- free (mgr->redobuff);
close (mgr->idx);
free (mgr);
#else
memset (pagezero, 0, 1 << bits);
pagezero->alloc->lvl = MIN_lvl - 1;
pagezero->alloc->bits = mgr->page_bits;
- bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
+ pagezero->redopages = redopages;
+
+ bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
pagezero->activepages = 2;
// initialize left-most LEAF page in
// alloc->left and count of active leaf pages.
bt_putid (pagezero->alloc->left, LEAF_page);
+ ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
- ftruncate (mgr->idx, REDO_page << mgr->page_bits);
-
- if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
+ if( bt_writepage (mgr, pagezero->alloc, 0) ) {
fprintf (stderr, "Unable to create btree page zero\n");
return bt_mgrclose (mgr), NULL;
}
pagezero->alloc->act = 1;
pagezero->alloc->page_no = MIN_lvl - lvl;
- if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
+ if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
fprintf (stderr, "Unable to create btree page\n");
return bt_mgrclose (mgr), NULL;
}
VirtualFree (pagezero, 0, MEM_RELEASE);
#endif
#ifdef unix
- // mlock the pagezero page
+ // mlock the first segment of 64K pages
flag = PROT_READ | PROT_WRITE;
- mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
- if( mgr->pagezero == MAP_FAILED ) {
- fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
+ mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
+ mgr->segments = 1;
+
+ if( mgr->pages[0] == MAP_FAILED ) {
+ fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
+
+ mgr->pagezero = (BtPageZero *)mgr->pages[0];
mlock (mgr->pagezero, mgr->page_size);
+ mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
+ mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
+
mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
if( mgr->pagepool == MAP_FAILED ) {
fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
- if( mgr->redopages = redopages ) {
- ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
- mgr->redobuff = valloc (redopages * mgr->page_size);
- }
#else
flag = PAGE_READWRITE;
mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
return bt_mgrclose (mgr), NULL;
}
- if( mgr->redopages = redopages )
- mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
#endif
mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
else
return mgr->line = __LINE__, mgr->err = BTERR_struct;
- bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
mgr->pagezero->activepages++;
+ bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
+ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
bt_releasemutex(mgr->lock);
+ contents->page_no = page_no;
memcpy (set->page, contents, mgr->page_size);
set->latch->dirty = 1;
return 0;
// extend file into new page.
mgr->pagezero->activepages++;
- contents->page_no = page_no;
-
- pwrite (mgr->idx, contents, mgr->page_size, (uid)(page_no + 1) << mgr->page_bits);
+ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
bt_releasemutex(mgr->lock);
+ contents->page_no = page_no;
+ pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
+
// don't load cache from btree page, load it from contents
if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
set->page->free = 1;
// decrement active page count
- // and unlock allocation page
mgr->pagezero->activepages--;
- bt_releasemutex (mgr->lock);
+ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
// unlock released page
+ // and unlock allocation page
bt_unlockpage (BtLockDelete, set->latch);
bt_unlockpage (BtLockWrite, set->latch);
bt_unpinlatch (mgr, set->latch);
+ bt_releasemutex (mgr->lock);
}
// a fence key was deleted from a page
// delete a page and manage keys
// call with page writelocked
+// returns with page removed
+// from the page pool.
+
BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
{
unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
uid page_no;
BtKey *ptr;
+
// cache copy of fence key
// to remove in parent
// pull contents of right peer into our empty page
memcpy (set->page, right->page, mgr->page_size);
+ set->page->page_no = set->latch->page_no;
set->latch->dirty = 1;
// mark right page deleted and point it to left page
bt_freepage (mgr, right);
bt_unlockpage (BtLockParent, set->latch);
+ bt_unpinlatch (mgr, set->latch);
return 0;
}
} else {
set->latch->dirty = 1;
bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (mgr, set->latch);
}
- bt_unpinlatch (mgr, set->latch);
return 0;
}
unsigned char value[BtId];
BtPageSet left[1];
uid left_page_no;
+BtPage frame;
BtKey *ptr;
BtVal *val;
+ frame = malloc (mgr->page_size);
+ memcpy (frame, root->page, mgr->page_size);
+
// save left page fence key for new root
ptr = keyptr(root->page, root->page->cnt);
// Obtain an empty page to use, and copy the current
// root contents into it, e.g. lower keys
- if( bt_newpage(mgr, left, root->page, page_no) )
+ if( bt_newpage(mgr, left, frame, page_no) )
return mgr->err;
left_page_no = left->latch->page_no;
bt_unpinlatch (mgr, left->latch);
+ free (frame);
// preserve the page info at the bottom
// of higher keys and set rest to zero
bt_putid(set->page->right, right->latch->page_no);
set->page->min = nxt;
set->page->cnt = idx;
+ free(frame);
- return right->latch->entry;
+ return right->latch - mgr->latchsets;
}
// fix keys for newly split page
set->page->act--;
node->dead = 0;
- mgr->found++;
+ __sync_fetch_and_add(&mgr->found, 1);
return 0;
}
// add entries to redo log
- if( bt->mgr->redopages )
+ if( bt->mgr->pagezero->redopages )
if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
for( src = 0; src++ < source->cnt; )
locks[src].lsn = lsn;
ptr = keyptr(set->page, slot);
if( !samepage ) {
- locks[src].entry = set->latch->entry;
+ locks[src].entry = set->latch - bt->mgr->latchsets;
locks[src].slot = slot;
locks[src].reuse = 0;
} else {
leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+ leaf->entry = prev->latch - bt->mgr->latchsets;
leaf->page_no = prev->latch->page_no;
- leaf->entry = prev->latch->entry;
leaf->type = 0;
if( tail )
leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+ leaf->entry = prev->latch - bt->mgr->latchsets;
leaf->page_no = prev->latch->page_no;
- leaf->entry = prev->latch->entry;
leaf->type = 0;
if( tail )
leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+ leaf->entry = prev->latch - bt->mgr->latchsets;
leaf->page_no = prev->latch->page_no;
- leaf->entry = prev->latch->entry;
leaf->nounlock = 1;
leaf->type = 2;
// is greater than the buffer pool
// promote page into larger btree
- while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+ if( bt->main )
+ while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
if( bt_txnpromote (bt) )
return bt->mgr->err;
if( !bt_mutextry(set->latch->modify) )
continue;
-// if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
-// bt_releasemutex(set->latch->modify);
-// continue;
-// }
-
// skip this entry if it is pinned
if( set->latch->pin & ~CLOCK_bit ) {
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
continue;
}
if( !set->latch->page_no || !bt_getid (set->page->right) ) {
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
continue;
}
if( set->page->lvl || set->page->free || set->page->kill ) {
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
bt_unlockpage(BtLockWrite, set->latch);
continue;
}
set->latch->pin++;
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
-
- // if page is dirty, then
- // sync it to the disk first.
-
- if( set->latch->dirty )
- if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
- return bt->mgr->line = __LINE__, bt->mgr->err;
- else
- set->latch->dirty = 0;
// transfer slots in our selected page to larger btree
if( !(set->latch->page_no % 100) )
if( bt_deletepage (bt->mgr, set, bt->thread_no) )
return bt->mgr->err;
- bt_unpinlatch (bt->mgr, set->latch);
return 0;
}
}
posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
#endif
fprintf(stderr, "started counting\n");
- next = LEAF_page + bt->mgr->redopages + 1;
+ next = REDO_page + bt->mgr->pagezero->redopages;
while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
if( bt_readpage (bt->mgr, bt->cursor, page_no) )
break;
}
- fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
bt_close (bt);
#ifdef unix
return NULL;
HANDLE *threads;
#endif
ThreadArg *args;
+uint redopages = 0;
uint poolsize = 0;
-uint recovery = 0;
uint mainpool = 0;
uint mainbits = 0;
float elapsed;
num = atoi(argv[6]);
if( argc > 7 )
- recovery = atoi(argv[7]);
+ redopages = atoi(argv[7]);
+
+ if( redopages + REDO_page > 65535 )
+ fprintf (stderr, "Warning: Recovery buffer too large\n");
if( argc > 8 )
mainbits = atoi(argv[8]);
#endif
args = malloc ((cnt + 1) * sizeof(ThreadArg));
- mgr = bt_mgr (argv[1], bits, poolsize, recovery);
+ mgr = bt_mgr (argv[1], bits, poolsize, redopages);
if( !mgr ) {
fprintf(stderr, "Index Open Error %s\n", argv[1]);
exit (1);
}
- main = bt_mgr (argv[2], mainbits, mainpool, 0);
+ if( mainbits ) {
+ main = bt_mgr (argv[2], mainbits, mainpool, 0);
- if( !main ) {
- fprintf(stderr, "Index Open Error %s\n", argv[2]);
- exit (1);
- }
+ if( !main ) {
+ fprintf(stderr, "Index Open Error %s\n", argv[2]);
+ exit (1);
+ }
+ } else
+ main = NULL;
// fire off threads
CloseHandle(threads[idx]);
#endif
bt_poolaudit(mgr);
- bt_poolaudit(main);
- bt_mgrclose (main);
+ if( main )
+ bt_poolaudit(main);
+
+ fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+
+ if( main )
+ bt_mgrclose (main);
bt_mgrclose (mgr);
elapsed = getCpuTime(0) - start;