X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv10.c;fp=threadskv10.c;h=62ee44e3d69fee2b61f67b2cd60044f4b08761a4;hp=cd307f6e9a24b60340f14737e0ee42d0a675ebb8;hb=57594095c9e98ec77e532919b42f277091d2a00e;hpb=172ab9f8496947eb879883777aeaf3f4b17f3522 diff --git a/threadskv10.c b/threadskv10.c index cd307f6..62ee44e 100644 --- a/threadskv10.c +++ b/threadskv10.c @@ -170,7 +170,6 @@ typedef struct { WOLock parent[1]; // Posting of fence key in parent WOLock atomic[1]; // Atomic update in progress uint split; // right split page atomic insert - uint entry; // entry slot in latch table uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain ushort pin; // number of accessing threads @@ -240,7 +239,7 @@ typedef struct { // note that this structure size // must be a multiple of 8 bytes -// in order to place dups correctly. +// in order to place PageZero correctly. typedef struct BtPage_ { uint cnt; // count of keys in page @@ -269,9 +268,9 @@ typedef struct { typedef struct { struct BtPage_ alloc[1]; // next page_no in right ptr - unsigned long long dups[1]; // global duplicate key uniqueifier unsigned char freechain[BtId]; // head of free page_nos chain - unsigned long long activepages; // number of active pages pages + unsigned long long activepages; // number of active pages + uint redopages; // number of redo pages in file } BtPageZero; // The object structure for Btree access @@ -299,7 +298,6 @@ typedef struct { uint latchhash; // number of latch hash table slots uint latchvictim; // next latch entry to examine uint latchpromote; // next latch entry to promote - uint redopages; // size of recovery buff in pages uint redolast; // last msync size of recovery buff uint redoend; // eof/end element in recovery buff int err; // last error @@ -365,7 +363,7 @@ typedef struct { extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr, BtMgr *main); -extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit); +extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no); extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no); extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no); extern void bt_unlockpage(BtLock mode, BtLatchSet *latch); @@ -457,15 +455,6 @@ int i; return id; } -uid bt_newdup (BtMgr *mgr) -{ -#ifdef unix - return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1; -#else - return _InterlockedIncrement64(mgr->pagezero->dups, 1); -#endif -} - // lite weight spin lock Latch Manager int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) @@ -676,9 +665,9 @@ void ReadRelease (RWLock *lock) void bt_flushlsn (BtMgr *mgr, ushort thread_no) { uint cnt3 = 0, cnt2 = 0, cnt = 0; +uint entry, segment; BtLatchSet *latch; BtPage page; -uint entry; // flush dirty pool pages to the btree @@ -690,7 +679,7 @@ fprintf(stderr, "Start flushlsn "); bt_lockpage(BtLockRead, latch, thread_no); if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no, 0); + bt_writepage(mgr, page, latch->page_no); latch->dirty = 0, cnt++; } if( latch->pin & ~CLOCK_bit ) @@ -700,7 +689,9 @@ cnt2++; } fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2); fprintf(stderr, "begin sync"); - sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER); + for( segment = 0; segment < mgr->segments; segment++ ) + if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); fprintf(stderr, " end sync\n"); } @@ -714,8 +705,6 @@ uint offset = 0; BtKey *key; BtVal *val; - pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size); - hdr = (BtLogHdr *)mgr->redobuff; mgr->flushlsn = hdr->lsn; @@ -738,11 +727,11 @@ BtVal *val; } // recovery manager -- append new entry to recovery log -// flush to disk when it overflows. +// flush dirty pages to disk when it overflows. logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no) { -uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); +uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr); uint amt = sizeof(BtLogHdr); BtLogHdr *hdr, *eof; uint last, end; @@ -757,7 +746,8 @@ uint last, end; if( amt > size - mgr->redoend ) { mgr->flushlsn = mgr->lsn; - msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); mgr->redolast = 0; mgr->redoend = 0; bt_flushlsn(mgr, thread_no); @@ -788,22 +778,25 @@ uint last, end; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = mgr->lsn; - last = mgr->redolast & 0xfff; + last = mgr->redolast & ~0xfff; end = mgr->redoend; - mgr->redolast = end; - bt_releasemutex(mgr->redo); + if( end - last + sizeof(BtLogHdr) >= 8192 ) + if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + else + mgr->redolast = end; - msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); + bt_releasemutex(mgr->redo); return hdr->lsn; } // recovery manager -- append transaction to recovery log -// flush to disk when it overflows. +// flush dirty pages to disk when it overflows. logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no) { -uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); +uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr); uint amt = 0, src, type; BtLogHdr *hdr, *eof; uint last, end; @@ -827,7 +820,8 @@ BtVal *val; if( amt > size - mgr->redoend ) { mgr->flushlsn = mgr->lsn; - msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); mgr->redolast = 0; mgr->redoend = 0; bt_flushlsn (mgr, thread_no); @@ -876,12 +870,15 @@ BtVal *val; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = lsn; - last = mgr->redolast & 0xfff; + last = mgr->redolast & ~0xfff; end = mgr->redoend; - mgr->redolast = end; - bt_releasemutex(mgr->redo); - msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); + if( end - last + sizeof(BtLogHdr) >= 8192 ) + if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + else + mgr->redolast = end; + bt_releasemutex(mgr->redo); return lsn; } @@ -891,15 +888,15 @@ BtVal *val; BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no) { int flag = PROT_READ | PROT_WRITE; -uint segment = page_no >> 32; -unsigned char *perm; +uint segment = page_no >> 16; +BtPage perm; while( 1 ) { if( segment < mgr->segments ) { - perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); - memcpy (page, perm, mgr->page_size); -if( page->page_no != page_no ) + perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits)); +if( perm->page_no != page_no ) abort(); + memcpy (page, perm, mgr->page_size); mgr->reads++; return 0; } @@ -921,18 +918,18 @@ abort(); // write page to permanent location in Btree file // clear the dirty bit -BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit) +BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no) { int flag = PROT_READ | PROT_WRITE; -uint segment = page_no >> 32; -unsigned char *perm; +uint segment = page_no >> 16; +BtPage perm; while( 1 ) { if( segment < mgr->segments ) { - perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); + perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits)); +if( page_no > LEAF_page && perm->page_no != page_no) +abort(); memcpy (perm, page, mgr->page_size); - if( syncit ) - msync (perm, mgr->page_size, MS_SYNC); mgr->writes++; return 0; } @@ -966,7 +963,8 @@ void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch) BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch) { -BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool); +uid entry = latch - mgr->latchsets; +BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool); return page; } @@ -1082,7 +1080,7 @@ trynext: // no read-lock is required since page is not pinned. if( latch->dirty ) - if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) ) + if( mgr->err = bt_writepage (mgr, page, latch->page_no) ) return mgr->line = __LINE__, NULL; else latch->dirty = 0; @@ -1091,7 +1089,7 @@ trynext: memcpy (page, contents, mgr->page_size); latch->dirty = 1; } else if( bt_readpage (mgr, page, page_no) ) - return mgr->line = __LINE__, NULL; + return mgr->line = __LINE__, NULL; // link page as head of hash table chain // if this is a never before used entry, @@ -1112,7 +1110,6 @@ trynext: latch->pin = CLOCK_bit | 1; latch->page_no = page_no; - latch->entry = entry; latch->split = 0; bt_releasemutex (latch->modify); @@ -1136,8 +1133,6 @@ uint slot; if( mgr->redoend ) { eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend); memset (eof, 0, sizeof(BtLogHdr)); - - pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits); } // write remaining dirty pool pages to the btree @@ -1147,24 +1142,19 @@ uint slot; latch = mgr->latchsets + slot; if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no, 0); + bt_writepage(mgr, page, latch->page_no); latch->dirty = 0, num++; } } - // flush last batch to disk and clear - // redo recovery buffer on disk. + // clear redo recovery buffer on disk. - fdatasync (mgr->idx); - - if( mgr->redopages ) { + if( mgr->pagezero->redopages ) { eof = (BtLogHdr *)mgr->redobuff; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = mgr->lsn; - - pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits); - - sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); + if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); } fprintf(stderr, "%d buffer pool pages flushed\n", num); @@ -1183,7 +1173,6 @@ uint slot; CloseHandle(mgr->hpool); #endif #ifdef unix - free (mgr->redobuff); close (mgr->idx); free (mgr); #else @@ -1303,17 +1292,18 @@ BtVal *val; memset (pagezero, 0, 1 << bits); pagezero->alloc->lvl = MIN_lvl - 1; pagezero->alloc->bits = mgr->page_bits; - bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1); + pagezero->redopages = redopages; + + bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1); pagezero->activepages = 2; // initialize left-most LEAF page in // alloc->left and count of active leaf pages. bt_putid (pagezero->alloc->left, LEAF_page); + ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits); - ftruncate (mgr->idx, REDO_page << mgr->page_bits); - - if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) { + if( bt_writepage (mgr, pagezero->alloc, 0) ) { fprintf (stderr, "Unable to create btree page zero\n"); return bt_mgrclose (mgr), NULL; } @@ -1340,7 +1330,7 @@ BtVal *val; pagezero->alloc->act = 1; pagezero->alloc->page_no = MIN_lvl - lvl; - if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) { + if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) { fprintf (stderr, "Unable to create btree page\n"); return bt_mgrclose (mgr), NULL; } @@ -1353,25 +1343,28 @@ mgrlatch: VirtualFree (pagezero, 0, MEM_RELEASE); #endif #ifdef unix - // mlock the pagezero page + // mlock the first segment of 64K pages flag = PROT_READ | PROT_WRITE; - mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits); - if( mgr->pagezero == MAP_FAILED ) { - fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno); + mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0); + mgr->segments = 1; + + if( mgr->pages[0] == MAP_FAILED ) { + fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } + + mgr->pagezero = (BtPageZero *)mgr->pages[0]; mlock (mgr->pagezero, mgr->page_size); + mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size; + mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits); + mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0); if( mgr->pagepool == MAP_FAILED ) { fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } - if( mgr->redopages = redopages ) { - ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits); - mgr->redobuff = valloc (redopages * mgr->page_size); - } #else flag = PAGE_READWRITE; mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL); @@ -1401,8 +1394,6 @@ mgrlatch: fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError()); return bt_mgrclose (mgr), NULL; } - if( mgr->redopages = redopages ) - mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE); #endif mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits)); @@ -1538,11 +1529,14 @@ int blk; else return mgr->line = __LINE__, mgr->err = BTERR_struct; - bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); mgr->pagezero->activepages++; + bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); + contents->page_no = page_no; memcpy (set->page, contents, mgr->page_size); set->latch->dirty = 1; return 0; @@ -1555,11 +1549,13 @@ int blk; // extend file into new page. mgr->pagezero->activepages++; - contents->page_no = page_no; - - pwrite (mgr->idx, contents, mgr->page_size, (uid)(page_no + 1) << mgr->page_bits); + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); + contents->page_no = page_no; + pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits); + // don't load cache from btree page, load it from contents if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) ) @@ -1722,16 +1718,18 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set) set->page->free = 1; // decrement active page count - // and unlock allocation page mgr->pagezero->activepages--; - bt_releasemutex (mgr->lock); + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); // unlock released page + // and unlock allocation page bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); + bt_releasemutex (mgr->lock); } // a fence key was deleted from a page @@ -1826,6 +1824,9 @@ uint idx; // delete a page and manage keys // call with page writelocked +// returns with page removed +// from the page pool. + BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray]; @@ -1835,6 +1836,7 @@ BtPageSet right[1]; uid page_no; BtKey *ptr; + // cache copy of fence key // to remove in parent @@ -1863,6 +1865,7 @@ BtKey *ptr; // pull contents of right peer into our empty page memcpy (set->page, right->page, mgr->page_size); + set->page->page_no = set->latch->page_no; set->latch->dirty = 1; // mark right page deleted and point it to left page @@ -1902,6 +1905,7 @@ BtKey *ptr; bt_freepage (mgr, right); bt_unlockpage (BtLockParent, set->latch); + bt_unpinlatch (mgr, set->latch); return 0; } @@ -1980,9 +1984,9 @@ BtVal *val; } else { set->latch->dirty = 1; bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (mgr, set->latch); } - bt_unpinlatch (mgr, set->latch); return 0; } @@ -2172,9 +2176,13 @@ uint nxt = mgr->page_size; unsigned char value[BtId]; BtPageSet left[1]; uid left_page_no; +BtPage frame; BtKey *ptr; BtVal *val; + frame = malloc (mgr->page_size); + memcpy (frame, root->page, mgr->page_size); + // save left page fence key for new root ptr = keyptr(root->page, root->page->cnt); @@ -2183,11 +2191,12 @@ BtVal *val; // Obtain an empty page to use, and copy the current // root contents into it, e.g. lower keys - if( bt_newpage(mgr, left, root->page, page_no) ) + if( bt_newpage(mgr, left, frame, page_no) ) return mgr->err; left_page_no = left->latch->page_no; bt_unpinlatch (mgr, left->latch); + free (frame); // preserve the page info at the bottom // of higher keys and set rest to zero @@ -2352,8 +2361,9 @@ uint prev; bt_putid(set->page->right, right->latch->page_no); set->page->min = nxt; set->page->cnt = idx; + free(frame); - return right->latch->entry; + return right->latch - mgr->latchsets; } // fix keys for newly split page @@ -2722,7 +2732,7 @@ BtVal *val; set->page->act--; node->dead = 0; - mgr->found++; + __sync_fetch_and_add(&mgr->found, 1); return 0; } @@ -2859,7 +2869,7 @@ int type; // add entries to redo log - if( bt->mgr->redopages ) + if( bt->mgr->pagezero->redopages ) if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) ) for( src = 0; src++ < source->cnt; ) locks[src].lsn = lsn; @@ -2896,7 +2906,7 @@ int type; ptr = keyptr(set->page, slot); if( !samepage ) { - locks[src].entry = set->latch->entry; + locks[src].entry = set->latch - bt->mgr->latchsets; locks[src].slot = slot; locks[src].reuse = 0; } else { @@ -3008,8 +3018,8 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->entry = prev->latch - bt->mgr->latchsets; leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; leaf->type = 0; if( tail ) @@ -3057,8 +3067,8 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->entry = prev->latch - bt->mgr->latchsets; leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; leaf->type = 0; if( tail ) @@ -3103,8 +3113,8 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->entry = prev->latch - bt->mgr->latchsets; leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; leaf->nounlock = 1; leaf->type = 2; @@ -3163,7 +3173,8 @@ int type; // is greater than the buffer pool // promote page into larger btree - while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) + if( bt->main ) + while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) if( bt_txnpromote (bt) ) return bt->mgr->err; @@ -3200,16 +3211,10 @@ BtVal *val; if( !bt_mutextry(set->latch->modify) ) continue; -// if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) { -// bt_releasemutex(set->latch->modify); -// continue; -// } - // skip this entry if it is pinned if( set->latch->pin & ~CLOCK_bit ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); continue; } @@ -3219,7 +3224,6 @@ BtVal *val; if( !set->latch->page_no || !bt_getid (set->page->right) ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); continue; } @@ -3231,7 +3235,6 @@ BtVal *val; if( set->page->lvl || set->page->free || set->page->kill ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); bt_unlockpage(BtLockWrite, set->latch); continue; } @@ -3240,16 +3243,6 @@ BtVal *val; set->latch->pin++; bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); - - // if page is dirty, then - // sync it to the disk first. - - if( set->latch->dirty ) - if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) ) - return bt->mgr->line = __LINE__, bt->mgr->err; - else - set->latch->dirty = 0; // transfer slots in our selected page to larger btree if( !(set->latch->page_no % 100) ) @@ -3281,7 +3274,6 @@ fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cn if( bt_deletepage (bt->mgr, set, bt->thread_no) ) return bt->mgr->err; - bt_unpinlatch (bt->mgr, set->latch); return 0; } } @@ -3698,7 +3690,7 @@ FILE *in; posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); #endif fprintf(stderr, "started counting\n"); - next = LEAF_page + bt->mgr->redopages + 1; + next = REDO_page + bt->mgr->pagezero->redopages; while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { if( bt_readpage (bt->mgr, bt->cursor, page_no) ) @@ -3716,7 +3708,6 @@ FILE *in; break; } - fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found); bt_close (bt); #ifdef unix return NULL; @@ -3738,8 +3729,8 @@ pthread_t *threads; HANDLE *threads; #endif ThreadArg *args; +uint redopages = 0; uint poolsize = 0; -uint recovery = 0; uint mainpool = 0; uint mainbits = 0; float elapsed; @@ -3779,7 +3770,10 @@ BtKey *ptr; num = atoi(argv[6]); if( argc > 7 ) - recovery = atoi(argv[7]); + redopages = atoi(argv[7]); + + if( redopages + REDO_page > 65535 ) + fprintf (stderr, "Warning: Recovery buffer too large\n"); if( argc > 8 ) mainbits = atoi(argv[8]); @@ -3795,19 +3789,22 @@ BtKey *ptr; #endif args = malloc ((cnt + 1) * sizeof(ThreadArg)); - mgr = bt_mgr (argv[1], bits, poolsize, recovery); + mgr = bt_mgr (argv[1], bits, poolsize, redopages); if( !mgr ) { fprintf(stderr, "Index Open Error %s\n", argv[1]); exit (1); } - main = bt_mgr (argv[2], mainbits, mainpool, 0); + if( mainbits ) { + main = bt_mgr (argv[2], mainbits, mainpool, 0); - if( !main ) { - fprintf(stderr, "Index Open Error %s\n", argv[2]); - exit (1); - } + if( !main ) { + fprintf(stderr, "Index Open Error %s\n", argv[2]); + exit (1); + } + } else + main = NULL; // fire off threads @@ -3848,9 +3845,14 @@ BtKey *ptr; CloseHandle(threads[idx]); #endif bt_poolaudit(mgr); - bt_poolaudit(main); - bt_mgrclose (main); + if( main ) + bt_poolaudit(main); + + fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found); + + if( main ) + bt_mgrclose (main); bt_mgrclose (mgr); elapsed = getCpuTime(0) - start;