X-Git-Url: https://pd.if.org/git/?a=blobdiff_plain;f=threadskv10.c;h=62ee44e3d69fee2b61f67b2cd60044f4b08761a4;hb=57594095c9e98ec77e532919b42f277091d2a00e;hp=129434fda0f8b8756ca60492ac578a408c3cb71d;hpb=4fa440cc8ba094e8f043922450fe5e1e06e374cf;p=btree diff --git a/threadskv10.c b/threadskv10.c index 129434f..62ee44e 100644 --- a/threadskv10.c +++ b/threadskv10.c @@ -110,28 +110,6 @@ typedef enum{ BtLockAtomic = 32 } BtLock; -// lite weight spin latch - -volatile typedef struct { - ushort exclusive:1; - ushort pending:1; - ushort share:14; -} BtMutexLatch; - -#define XCL 1 -#define PEND 2 -#define BOTH 3 -#define SHARE 4 - -/* -// exclusive is set for write access - -volatile typedef struct { - unsigned char exclusive[1]; - unsigned char filler; -} BtMutexLatch; -*/ -/* typedef struct { union { struct { @@ -141,7 +119,7 @@ typedef struct { uint value[1]; }; } BtMutexLatch; -*/ + #define XCL 1 #define WRT 2 @@ -192,7 +170,6 @@ typedef struct { WOLock parent[1]; // Posting of fence key in parent WOLock atomic[1]; // Atomic update in progress uint split; // right split page atomic insert - uint entry; // entry slot in latch table uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain ushort pin; // number of accessing threads @@ -262,7 +239,7 @@ typedef struct { // note that this structure size // must be a multiple of 8 bytes -// in order to place dups correctly. +// in order to place PageZero correctly. typedef struct BtPage_ { uint cnt; // count of keys in page @@ -291,9 +268,9 @@ typedef struct { typedef struct { struct BtPage_ alloc[1]; // next page_no in right ptr - unsigned long long dups[1]; // global duplicate key uniqueifier unsigned char freechain[BtId]; // head of free page_nos chain - unsigned long long activepages; // number of active pages pages + unsigned long long activepages; // number of active pages + uint redopages; // number of redo pages in file } BtPageZero; // The object structure for Btree access @@ -321,7 +298,6 @@ typedef struct { uint latchhash; // number of latch hash table slots uint latchvictim; // next latch entry to examine uint latchpromote; // next latch entry to promote - uint redopages; // size of recovery buff in pages uint redolast; // last msync size of recovery buff uint redoend; // eof/end element in recovery buff int err; // last error @@ -387,7 +363,7 @@ typedef struct { extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr, BtMgr *main); -extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit); +extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no); extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no); extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no); extern void bt_unlockpage(BtLock mode, BtLatchSet *latch); @@ -479,15 +455,6 @@ int i; return id; } -uid bt_newdup (BtMgr *mgr) -{ -#ifdef unix - return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1; -#else - return _InterlockedIncrement64(mgr->pagezero->dups, 1); -#endif -} - // lite weight spin lock Latch Manager int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) @@ -497,46 +464,6 @@ int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *add void bt_mutexlock(BtMutexLatch *latch) { -ushort prev; - - do { -#ifdef unix - prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL); -#else - prev = _InterlockedOr16((ushort *)latch, PEND | XCL); -#endif - if( !(prev & XCL) ) - if( !(prev & ~BOTH) ) - return; - else -#ifdef unix - __sync_fetch_and_and ((ushort *)latch, ~XCL); -#else - _InterlockedAnd16((ushort *)latch, ~XCL); -#endif -#ifdef unix - } while( sched_yield(), 1 ); -#else - } while( SwitchToThread(), 1 ); -#endif -/* -unsigned char prev; - - do { -#ifdef unix - prev = __sync_fetch_and_or(latch->exclusive, XCL); -#else - prev = _InterlockedOr8(latch->exclusive, XCL); -#endif - if( !(prev & XCL) ) - return; -#ifdef unix - } while( sched_yield(), 1 ); -#else - } while( SwitchToThread(), 1 ); -#endif -*/ -/* BtMutexLatch prev[1]; uint slept = 0; @@ -556,7 +483,7 @@ uint slept = 0; sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr); slept = 1; - } */ + } } // try to obtain write lock @@ -566,38 +493,6 @@ uint slept = 0; int bt_mutextry(BtMutexLatch *latch) { -ushort prev; - -#ifdef unix - prev = __sync_fetch_and_or((ushort *)latch, XCL); -#else - prev = _InterlockedOr16((ushort *)latch, XCL); -#endif - // take write access if all bits are clear - - if( !(prev & XCL) ) - if( !(prev & ~BOTH) ) - return 1; - else -#ifdef unix - __sync_fetch_and_and ((ushort *)latch, ~XCL); -#else - _InterlockedAnd16((ushort *)latch, ~XCL); -#endif - return 0; -/* -unsigned char prev; - -#ifdef unix - prev = __sync_fetch_and_or(latch->exclusive, XCL); -#else - prev = _InterlockedOr8(latch->exclusive, XCL); -#endif - // take write access if all bits are clear - - return !(prev & XCL); -*/ -/* BtMutexLatch prev[1]; *prev->value = __sync_fetch_and_or(latch->value, XCL); @@ -605,29 +500,18 @@ BtMutexLatch prev[1]; // take write access if exclusive bit is clear return !prev->bits->xlock; -*/ } // clear write mode void bt_releasemutex(BtMutexLatch *latch) { -#ifdef unix - __sync_fetch_and_and((ushort *)latch, ~BOTH); -#else - _InterlockedAnd16((ushort *)latch, ~BOTH); -#endif -/* - *latch->exclusive = 0; -*/ -/* BtMutexLatch prev[1]; *prev->value = __sync_fetch_and_and(latch->value, ~XCL); if( prev->bits->wrt ) sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); -*/ } // Write-Only Queue Lock @@ -781,9 +665,9 @@ void ReadRelease (RWLock *lock) void bt_flushlsn (BtMgr *mgr, ushort thread_no) { uint cnt3 = 0, cnt2 = 0, cnt = 0; +uint entry, segment; BtLatchSet *latch; BtPage page; -uint entry; // flush dirty pool pages to the btree @@ -795,7 +679,7 @@ fprintf(stderr, "Start flushlsn "); bt_lockpage(BtLockRead, latch, thread_no); if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no, 0); + bt_writepage(mgr, page, latch->page_no); latch->dirty = 0, cnt++; } if( latch->pin & ~CLOCK_bit ) @@ -805,7 +689,9 @@ cnt2++; } fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2); fprintf(stderr, "begin sync"); - sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER); + for( segment = 0; segment < mgr->segments; segment++ ) + if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); fprintf(stderr, " end sync\n"); } @@ -819,8 +705,6 @@ uint offset = 0; BtKey *key; BtVal *val; - pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size); - hdr = (BtLogHdr *)mgr->redobuff; mgr->flushlsn = hdr->lsn; @@ -843,11 +727,11 @@ BtVal *val; } // recovery manager -- append new entry to recovery log -// flush to disk when it overflows. +// flush dirty pages to disk when it overflows. logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no) { -uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); +uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr); uint amt = sizeof(BtLogHdr); BtLogHdr *hdr, *eof; uint last, end; @@ -862,7 +746,8 @@ uint last, end; if( amt > size - mgr->redoend ) { mgr->flushlsn = mgr->lsn; - msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); mgr->redolast = 0; mgr->redoend = 0; bt_flushlsn(mgr, thread_no); @@ -893,22 +778,25 @@ uint last, end; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = mgr->lsn; - last = mgr->redolast & 0xfff; + last = mgr->redolast & ~0xfff; end = mgr->redoend; - mgr->redolast = end; - bt_releasemutex(mgr->redo); + if( end - last + sizeof(BtLogHdr) >= 8192 ) + if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + else + mgr->redolast = end; - msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); + bt_releasemutex(mgr->redo); return hdr->lsn; } // recovery manager -- append transaction to recovery log -// flush to disk when it overflows. +// flush dirty pages to disk when it overflows. logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no) { -uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); +uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr); uint amt = 0, src, type; BtLogHdr *hdr, *eof; uint last, end; @@ -932,7 +820,8 @@ BtVal *val; if( amt > size - mgr->redoend ) { mgr->flushlsn = mgr->lsn; - msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); mgr->redolast = 0; mgr->redoend = 0; bt_flushlsn (mgr, thread_no); @@ -981,12 +870,15 @@ BtVal *val; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = lsn; - last = mgr->redolast & 0xfff; + last = mgr->redolast & ~0xfff; end = mgr->redoend; - mgr->redolast = end; - bt_releasemutex(mgr->redo); - msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); + if( end - last + sizeof(BtLogHdr) >= 8192 ) + if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + else + mgr->redolast = end; + bt_releasemutex(mgr->redo); return lsn; } @@ -995,27 +887,16 @@ BtVal *val; BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no) { -off64_t off = page_no << mgr->page_bits; - - if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) { - fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno); - return BTERR_read; - } -if( page->page_no != page_no ) -abort(); - mgr->reads++; - return 0; -/* int flag = PROT_READ | PROT_WRITE; -uint segment = page_no >> 32; -unsigned char *perm; +uint segment = page_no >> 16; +BtPage perm; while( 1 ) { if( segment < mgr->segments ) { - perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); - memcpy (page, perm, mgr->page_size); -if( page->page_no != page_no ) + perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits)); +if( perm->page_no != page_no ) abort(); + memcpy (page, perm, mgr->page_size); mgr->reads++; return 0; } @@ -1031,33 +912,24 @@ abort(); mgr->segments++; bt_releasemutex (mgr->maps); - } */ + } } // write page to permanent location in Btree file // clear the dirty bit -BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit) +BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no) { -off64_t off = page_no << mgr->page_bits; - - if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) { - fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno); - return BTERR_wrt; - } - mgr->writes++; - return 0; -/* int flag = PROT_READ | PROT_WRITE; -uint segment = page_no >> 32; -unsigned char *perm; +uint segment = page_no >> 16; +BtPage perm; while( 1 ) { if( segment < mgr->segments ) { - perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); + perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits)); +if( page_no > LEAF_page && perm->page_no != page_no) +abort(); memcpy (perm, page, mgr->page_size); - if( syncit ) - msync (perm, mgr->page_size, MS_SYNC); mgr->writes++; return 0; } @@ -1072,7 +944,7 @@ unsigned char *perm; mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16)); bt_releasemutex (mgr->maps); mgr->segments++; - } */ + } } // set CLOCK bit in latch @@ -1091,7 +963,8 @@ void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch) BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch) { -BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool); +uid entry = latch - mgr->latchsets; +BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool); return page; } @@ -1207,7 +1080,7 @@ trynext: // no read-lock is required since page is not pinned. if( latch->dirty ) - if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) ) + if( mgr->err = bt_writepage (mgr, page, latch->page_no) ) return mgr->line = __LINE__, NULL; else latch->dirty = 0; @@ -1216,7 +1089,7 @@ trynext: memcpy (page, contents, mgr->page_size); latch->dirty = 1; } else if( bt_readpage (mgr, page, page_no) ) - return mgr->line = __LINE__, NULL; + return mgr->line = __LINE__, NULL; // link page as head of hash table chain // if this is a never before used entry, @@ -1237,7 +1110,6 @@ trynext: latch->pin = CLOCK_bit | 1; latch->page_no = page_no; - latch->entry = entry; latch->split = 0; bt_releasemutex (latch->modify); @@ -1261,8 +1133,6 @@ uint slot; if( mgr->redoend ) { eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend); memset (eof, 0, sizeof(BtLogHdr)); - - pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits); } // write remaining dirty pool pages to the btree @@ -1272,24 +1142,19 @@ uint slot; latch = mgr->latchsets + slot; if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no, 0); + bt_writepage(mgr, page, latch->page_no); latch->dirty = 0, num++; } } - // flush last batch to disk and clear - // redo recovery buffer on disk. - - fdatasync (mgr->idx); + // clear redo recovery buffer on disk. - if( mgr->redopages ) { + if( mgr->pagezero->redopages ) { eof = (BtLogHdr *)mgr->redobuff; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = mgr->lsn; - - pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits); - - sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); + if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); } fprintf(stderr, "%d buffer pool pages flushed\n", num); @@ -1308,7 +1173,6 @@ uint slot; CloseHandle(mgr->hpool); #endif #ifdef unix - free (mgr->redobuff); close (mgr->idx); free (mgr); #else @@ -1428,17 +1292,18 @@ BtVal *val; memset (pagezero, 0, 1 << bits); pagezero->alloc->lvl = MIN_lvl - 1; pagezero->alloc->bits = mgr->page_bits; - bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1); + pagezero->redopages = redopages; + + bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1); pagezero->activepages = 2; // initialize left-most LEAF page in // alloc->left and count of active leaf pages. bt_putid (pagezero->alloc->left, LEAF_page); + ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits); - ftruncate (mgr->idx, REDO_page << mgr->page_bits); - - if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) { + if( bt_writepage (mgr, pagezero->alloc, 0) ) { fprintf (stderr, "Unable to create btree page zero\n"); return bt_mgrclose (mgr), NULL; } @@ -1465,7 +1330,7 @@ BtVal *val; pagezero->alloc->act = 1; pagezero->alloc->page_no = MIN_lvl - lvl; - if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) { + if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) { fprintf (stderr, "Unable to create btree page\n"); return bt_mgrclose (mgr), NULL; } @@ -1478,25 +1343,28 @@ mgrlatch: VirtualFree (pagezero, 0, MEM_RELEASE); #endif #ifdef unix - // mlock the pagezero page + // mlock the first segment of 64K pages flag = PROT_READ | PROT_WRITE; - mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits); - if( mgr->pagezero == MAP_FAILED ) { - fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno); + mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0); + mgr->segments = 1; + + if( mgr->pages[0] == MAP_FAILED ) { + fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } + + mgr->pagezero = (BtPageZero *)mgr->pages[0]; mlock (mgr->pagezero, mgr->page_size); + mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size; + mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits); + mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0); if( mgr->pagepool == MAP_FAILED ) { fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } - if( mgr->redopages = redopages ) { - ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits); - mgr->redobuff = valloc (redopages * mgr->page_size); - } #else flag = PAGE_READWRITE; mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL); @@ -1526,8 +1394,6 @@ mgrlatch: fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError()); return bt_mgrclose (mgr), NULL; } - if( mgr->redopages = redopages ) - mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE); #endif mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits)); @@ -1663,11 +1529,14 @@ int blk; else return mgr->line = __LINE__, mgr->err = BTERR_struct; - bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); mgr->pagezero->activepages++; + bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); + contents->page_no = page_no; memcpy (set->page, contents, mgr->page_size); set->latch->dirty = 1; return 0; @@ -1680,13 +1549,14 @@ int blk; // extend file into new page. mgr->pagezero->activepages++; - - ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits); + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); - // don't load cache from btree page, load it from contents - contents->page_no = page_no; + pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits); + + // don't load cache from btree page, load it from contents if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) ) set->page = bt_mappage (mgr, set->latch); @@ -1848,16 +1718,18 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set) set->page->free = 1; // decrement active page count - // and unlock allocation page mgr->pagezero->activepages--; - bt_releasemutex (mgr->lock); + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); // unlock released page + // and unlock allocation page bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); + bt_releasemutex (mgr->lock); } // a fence key was deleted from a page @@ -1952,6 +1824,9 @@ uint idx; // delete a page and manage keys // call with page writelocked +// returns with page removed +// from the page pool. + BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray]; @@ -1961,6 +1836,7 @@ BtPageSet right[1]; uid page_no; BtKey *ptr; + // cache copy of fence key // to remove in parent @@ -1989,6 +1865,7 @@ BtKey *ptr; // pull contents of right peer into our empty page memcpy (set->page, right->page, mgr->page_size); + set->page->page_no = set->latch->page_no; set->latch->dirty = 1; // mark right page deleted and point it to left page @@ -2028,6 +1905,7 @@ BtKey *ptr; bt_freepage (mgr, right); bt_unlockpage (BtLockParent, set->latch); + bt_unpinlatch (mgr, set->latch); return 0; } @@ -2106,9 +1984,9 @@ BtVal *val; } else { set->latch->dirty = 1; bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (mgr, set->latch); } - bt_unpinlatch (mgr, set->latch); return 0; } @@ -2298,9 +2176,13 @@ uint nxt = mgr->page_size; unsigned char value[BtId]; BtPageSet left[1]; uid left_page_no; +BtPage frame; BtKey *ptr; BtVal *val; + frame = malloc (mgr->page_size); + memcpy (frame, root->page, mgr->page_size); + // save left page fence key for new root ptr = keyptr(root->page, root->page->cnt); @@ -2309,11 +2191,12 @@ BtVal *val; // Obtain an empty page to use, and copy the current // root contents into it, e.g. lower keys - if( bt_newpage(mgr, left, root->page, page_no) ) + if( bt_newpage(mgr, left, frame, page_no) ) return mgr->err; left_page_no = left->latch->page_no; bt_unpinlatch (mgr, left->latch); + free (frame); // preserve the page info at the bottom // of higher keys and set rest to zero @@ -2478,8 +2361,9 @@ uint prev; bt_putid(set->page->right, right->latch->page_no); set->page->min = nxt; set->page->cnt = idx; + free(frame); - return right->latch->entry; + return right->latch - mgr->latchsets; } // fix keys for newly split page @@ -2848,7 +2732,7 @@ BtVal *val; set->page->act--; node->dead = 0; - mgr->found++; + __sync_fetch_and_add(&mgr->found, 1); return 0; } @@ -2985,7 +2869,7 @@ int type; // add entries to redo log - if( bt->mgr->redopages ) + if( bt->mgr->pagezero->redopages ) if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) ) for( src = 0; src++ < source->cnt; ) locks[src].lsn = lsn; @@ -3022,7 +2906,7 @@ int type; ptr = keyptr(set->page, slot); if( !samepage ) { - locks[src].entry = set->latch->entry; + locks[src].entry = set->latch - bt->mgr->latchsets; locks[src].slot = slot; locks[src].reuse = 0; } else { @@ -3134,8 +3018,8 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->entry = prev->latch - bt->mgr->latchsets; leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; leaf->type = 0; if( tail ) @@ -3183,8 +3067,8 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->entry = prev->latch - bt->mgr->latchsets; leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; leaf->type = 0; if( tail ) @@ -3229,8 +3113,8 @@ int type; leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->entry = prev->latch - bt->mgr->latchsets; leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; leaf->nounlock = 1; leaf->type = 2; @@ -3289,7 +3173,8 @@ int type; // is greater than the buffer pool // promote page into larger btree - while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) + if( bt->main ) + while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) if( bt_txnpromote (bt) ) return bt->mgr->err; @@ -3326,16 +3211,10 @@ BtVal *val; if( !bt_mutextry(set->latch->modify) ) continue; -// if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) { -// bt_releasemutex(set->latch->modify); -// continue; -// } - // skip this entry if it is pinned if( set->latch->pin & ~CLOCK_bit ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); continue; } @@ -3345,7 +3224,6 @@ BtVal *val; if( !set->latch->page_no || !bt_getid (set->page->right) ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); continue; } @@ -3357,7 +3235,6 @@ BtVal *val; if( set->page->lvl || set->page->free || set->page->kill ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); bt_unlockpage(BtLockWrite, set->latch); continue; } @@ -3366,16 +3243,6 @@ BtVal *val; set->latch->pin++; bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); - - // if page is dirty, then - // sync it to the disk first. - - if( set->latch->dirty ) - if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) ) - return bt->mgr->line = __LINE__, bt->mgr->err; - else - set->latch->dirty = 0; // transfer slots in our selected page to larger btree if( !(set->latch->page_no % 100) ) @@ -3407,7 +3274,6 @@ fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cn if( bt_deletepage (bt->mgr, set, bt->thread_no) ) return bt->mgr->err; - bt_unpinlatch (bt->mgr, set->latch); return 0; } } @@ -3824,7 +3690,7 @@ FILE *in; posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); #endif fprintf(stderr, "started counting\n"); - next = LEAF_page + bt->mgr->redopages + 1; + next = REDO_page + bt->mgr->pagezero->redopages; while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { if( bt_readpage (bt->mgr, bt->cursor, page_no) ) @@ -3842,7 +3708,6 @@ FILE *in; break; } - fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found); bt_close (bt); #ifdef unix return NULL; @@ -3864,8 +3729,8 @@ pthread_t *threads; HANDLE *threads; #endif ThreadArg *args; +uint redopages = 0; uint poolsize = 0; -uint recovery = 0; uint mainpool = 0; uint mainbits = 0; float elapsed; @@ -3905,7 +3770,10 @@ BtKey *ptr; num = atoi(argv[6]); if( argc > 7 ) - recovery = atoi(argv[7]); + redopages = atoi(argv[7]); + + if( redopages + REDO_page > 65535 ) + fprintf (stderr, "Warning: Recovery buffer too large\n"); if( argc > 8 ) mainbits = atoi(argv[8]); @@ -3921,19 +3789,22 @@ BtKey *ptr; #endif args = malloc ((cnt + 1) * sizeof(ThreadArg)); - mgr = bt_mgr (argv[1], bits, poolsize, recovery); + mgr = bt_mgr (argv[1], bits, poolsize, redopages); if( !mgr ) { fprintf(stderr, "Index Open Error %s\n", argv[1]); exit (1); } - main = bt_mgr (argv[2], mainbits, mainpool, 0); + if( mainbits ) { + main = bt_mgr (argv[2], mainbits, mainpool, 0); - if( !main ) { - fprintf(stderr, "Index Open Error %s\n", argv[2]); - exit (1); - } + if( !main ) { + fprintf(stderr, "Index Open Error %s\n", argv[2]); + exit (1); + } + } else + main = NULL; // fire off threads @@ -3974,9 +3845,14 @@ BtKey *ptr; CloseHandle(threads[idx]); #endif bt_poolaudit(mgr); - bt_poolaudit(main); - bt_mgrclose (main); + if( main ) + bt_poolaudit(main); + + fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found); + + if( main ) + bt_mgrclose (main); bt_mgrclose (mgr); elapsed = getCpuTime(0) - start;