X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv10.c;h=194757b25f2aa192b379b01511ef567338dd76ad;hp=cd307f6e9a24b60340f14737e0ee42d0a675ebb8;hb=392e5f08cc164c87e56153aa78a740f93325750e;hpb=172ab9f8496947eb879883777aeaf3f4b17f3522 diff --git a/threadskv10.c b/threadskv10.c index cd307f6..194757b 100644 --- a/threadskv10.c +++ b/threadskv10.c @@ -9,7 +9,7 @@ // redo log for failure recovery // and LSM B-trees for write optimization -// 11 OCT 2014 +// 15 OCT 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -113,15 +113,15 @@ typedef enum{ typedef struct { union { struct { - volatile uint xlock:1; // one writer has exclusive lock - volatile uint wrt:31; // count of other writers waiting + volatile ushort xlock[1]; // one writer has exclusive lock + volatile ushort wrt[1]; // count of other writers waiting } bits[1]; uint value[1]; }; } BtMutexLatch; #define XCL 1 -#define WRT 2 +#define WRT 65536 // definition for phase-fair reader/writer lock implementation @@ -130,16 +130,20 @@ typedef struct { volatile ushort rout[1]; volatile ushort ticket[1]; volatile ushort serving[1]; - volatile ushort tid[1]; - volatile ushort dup[1]; } RWLock; -// write only lock +// write only reentrant lock typedef struct { - BtMutexLatch xcl[1]; - volatile ushort tid[1]; - volatile ushort dup[1]; + BtMutexLatch xcl[1]; + union { + struct { + volatile ushort tid[1]; + volatile ushort dup[1]; + } bits[1]; + uint value[1]; + }; + volatile uint waiters[1]; } WOLock; #define PHID 0x1 @@ -170,11 +174,11 @@ typedef struct { WOLock parent[1]; // Posting of fence key in parent WOLock atomic[1]; // Atomic update in progress uint split; // right split page atomic insert - uint entry; // entry slot in latch table uint next; // next entry in hash table chain uint prev; // prev entry in hash table chain ushort pin; // number of accessing threads - unsigned char dirty; // page in cache is dirty (atomic set) + unsigned char dirty; // page in cache is dirty (atomic setable) + unsigned char promote; // page in cache is dirty (atomic setable) BtMutexLatch modify[1]; // modify entry lite latch } BtLatchSet; @@ -240,7 +244,7 @@ typedef struct { // note that this structure size // must be a multiple of 8 bytes -// in order to place dups correctly. +// in order to place PageZero correctly. typedef struct BtPage_ { uint cnt; // count of keys in page @@ -269,9 +273,9 @@ typedef struct { typedef struct { struct BtPage_ alloc[1]; // next page_no in right ptr - unsigned long long dups[1]; // global duplicate key uniqueifier unsigned char freechain[BtId]; // head of free page_nos chain - unsigned long long activepages; // number of active pages pages + unsigned long long activepages; // number of active pages + uint redopages; // number of redo pages in file } BtPageZero; // The object structure for Btree access @@ -299,7 +303,6 @@ typedef struct { uint latchhash; // number of latch hash table slots uint latchvictim; // next latch entry to examine uint latchpromote; // next latch entry to promote - uint redopages; // size of recovery buff in pages uint redolast; // last msync size of recovery buff uint redoend; // eof/end element in recovery buff int err; // last error @@ -322,6 +325,15 @@ typedef struct { unsigned char key[BT_keyarray]; // last found complete key } BtDb; +// atomic txn structures + +typedef struct { + logseqno reqlsn; // redo log seq no required + uint entry; // latch table entry number + uint slot:31; // page slot number + uint reuse:1; // reused previous page +} AtomicTxn; + // Catastrophic errors typedef enum { @@ -365,7 +377,7 @@ typedef struct { extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr, BtMgr *main); -extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit); +extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no); extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no); extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no); extern void bt_unlockpage(BtLock mode, BtLatchSet *latch); @@ -385,7 +397,8 @@ extern void bt_mgrclose (BtMgr *mgr); extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no); extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no); -// transaction functions +// atomic transaction functions +BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no); BTERR bt_txnpromote (BtDb *bt); // The page is allocated from low and hi ends. @@ -457,15 +470,6 @@ int i; return id; } -uid bt_newdup (BtMgr *mgr) -{ -#ifdef unix - return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1; -#else - return _InterlockedIncrement64(mgr->pagezero->dups, 1); -#endif -} - // lite weight spin lock Latch Manager int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) @@ -481,18 +485,18 @@ uint slept = 0; while( 1 ) { *prev->value = __sync_fetch_and_or(latch->value, XCL); - if( !prev->bits->xlock ) { // did we set XCL bit? + if( !*prev->bits->xlock ) { // did we set XCL? if( slept ) __sync_fetch_and_sub(latch->value, WRT); return; } if( !slept ) { - prev->bits->wrt++; + *prev->bits->wrt += 1; __sync_fetch_and_add(latch->value, WRT); } - sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr); + sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr); slept = 1; } } @@ -508,9 +512,9 @@ BtMutexLatch prev[1]; *prev->value = __sync_fetch_and_or(latch->value, XCL); - // take write access if exclusive bit is clear + // take write access if exclusive bit was clear - return !prev->bits->xlock; + return !*prev->bits->xlock; } // clear write mode @@ -521,32 +525,82 @@ BtMutexLatch prev[1]; *prev->value = __sync_fetch_and_and(latch->value, ~XCL); - if( prev->bits->wrt ) - sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); + if( *prev->bits->wrt ) + sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr ); } -// Write-Only Queue Lock +// Write-Only Reentrant Lock void WriteOLock (WOLock *lock, ushort tid) { - if( *lock->tid == tid ) { - *lock->dup += 1; +uint prev, waited = 0; + + while( 1 ) { + bt_mutexlock(lock->xcl); + + if( waited ) + *lock->waiters -= 1; + + if( *lock->bits->tid == tid ) { + *lock->bits->dup += 1; + bt_releasemutex(lock->xcl); + return; + } + if( !*lock->bits->tid ) { + *lock->bits->tid = tid; + bt_releasemutex(lock->xcl); return; } - bt_mutexlock(lock->xcl); - *lock->tid = tid; + waited = 1; + *lock->waiters += 1; + prev = *lock->value; + + bt_releasemutex(lock->xcl); + + sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr ); + } } void WriteORelease (WOLock *lock) { - if( *lock->dup ) { - *lock->dup -= 1; - return; + bt_mutexlock(lock->xcl); + + if( *lock->bits->dup ) { + *lock->bits->dup -= 1; + bt_releasemutex(lock->xcl); + return; + } + + *lock->bits->tid = 0; + + if( *lock->waiters ) + sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr ); + bt_releasemutex(lock->xcl); +} + +// clear lock of holders and waiters + +ClearWOLock (WOLock *lock) +{ + while( 1 ) { + bt_mutexlock(lock->xcl); + + if( *lock->waiters ) { + bt_releasemutex(lock->xcl); + sched_yield(); + continue; + } + + if( *lock->bits->tid ) { + bt_releasemutex(lock->xcl); + sched_yield(); + continue; } - *lock->tid = 0; bt_releasemutex(lock->xcl); + return; + } } // Phase-Fair reader/writer lock implementation @@ -555,10 +609,6 @@ void WriteLock (RWLock *lock, ushort tid) { ushort w, r, tix; - if( *lock->tid == tid ) { - *lock->dup += 1; - return; - } #ifdef unix tix = __sync_fetch_and_add (lock->ticket, 1); #else @@ -585,17 +635,10 @@ ushort w, r, tix; #else SwitchToThread(); #endif - *lock->tid = tid; } void WriteRelease (RWLock *lock) { - if( *lock->dup ) { - *lock->dup -= 1; - return; - } - - *lock->tid = 0; #ifdef unix __sync_fetch_and_and (lock->rin, ~MASK); #else @@ -611,12 +654,6 @@ int ReadTry (RWLock *lock, ushort tid) { ushort w; - // OK if write lock already held by same thread - - if( *lock->tid == tid ) { - *lock->dup += 1; - return 1; - } #ifdef unix w = __sync_fetch_and_add (lock->rin, RINC) & MASK; #else @@ -639,10 +676,6 @@ void ReadLock (RWLock *lock, ushort tid) { ushort w; - if( *lock->tid == tid ) { - *lock->dup += 1; - return; - } #ifdef unix w = __sync_fetch_and_add (lock->rin, RINC) & MASK; #else @@ -659,11 +692,6 @@ ushort w; void ReadRelease (RWLock *lock) { - if( *lock->dup ) { - *lock->dup -= 1; - return; - } - #ifdef unix __sync_fetch_and_add (lock->rout, RINC); #else @@ -676,9 +704,9 @@ void ReadRelease (RWLock *lock) void bt_flushlsn (BtMgr *mgr, ushort thread_no) { uint cnt3 = 0, cnt2 = 0, cnt = 0; +uint entry, segment; BtLatchSet *latch; BtPage page; -uint entry; // flush dirty pool pages to the btree @@ -690,7 +718,7 @@ fprintf(stderr, "Start flushlsn "); bt_lockpage(BtLockRead, latch, thread_no); if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no, 0); + bt_writepage(mgr, page, latch->page_no); latch->dirty = 0, cnt++; } if( latch->pin & ~CLOCK_bit ) @@ -700,7 +728,9 @@ cnt2++; } fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2); fprintf(stderr, "begin sync"); - sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER); + for( segment = 0; segment < mgr->segments; segment++ ) + if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); fprintf(stderr, " end sync\n"); } @@ -714,8 +744,6 @@ uint offset = 0; BtKey *key; BtVal *val; - pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size); - hdr = (BtLogHdr *)mgr->redobuff; mgr->flushlsn = hdr->lsn; @@ -738,11 +766,11 @@ BtVal *val; } // recovery manager -- append new entry to recovery log -// flush to disk when it overflows. +// flush dirty pages to disk when it overflows. logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no) { -uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); +uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr); uint amt = sizeof(BtLogHdr); BtLogHdr *hdr, *eof; uint last, end; @@ -757,7 +785,8 @@ uint last, end; if( amt > size - mgr->redoend ) { mgr->flushlsn = mgr->lsn; - msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); mgr->redolast = 0; mgr->redoend = 0; bt_flushlsn(mgr, thread_no); @@ -788,22 +817,25 @@ uint last, end; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = mgr->lsn; - last = mgr->redolast & 0xfff; + last = mgr->redolast & ~0xfff; end = mgr->redoend; - mgr->redolast = end; - bt_releasemutex(mgr->redo); + if( end - last + sizeof(BtLogHdr) >= 32768 ) + if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + else + mgr->redolast = end; - msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); + bt_releasemutex(mgr->redo); return hdr->lsn; } // recovery manager -- append transaction to recovery log -// flush to disk when it overflows. +// flush dirty pages to disk when it overflows. logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no) { -uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); +uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr); uint amt = 0, src, type; BtLogHdr *hdr, *eof; uint last, end; @@ -827,7 +859,8 @@ BtVal *val; if( amt > size - mgr->redoend ) { mgr->flushlsn = mgr->lsn; - msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); mgr->redolast = 0; mgr->redoend = 0; bt_flushlsn (mgr, thread_no); @@ -876,30 +909,52 @@ BtVal *val; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = lsn; - last = mgr->redolast & 0xfff; + last = mgr->redolast & ~0xfff; end = mgr->redoend; - mgr->redolast = end; - bt_releasemutex(mgr->redo); - msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); + if( end - last + sizeof(BtLogHdr) >= 32768 ) + if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + else + mgr->redolast = end; + bt_releasemutex(mgr->redo); return lsn; } +// sync a single btree page to disk + +BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch) +{ +uint segment = latch->page_no >> 16; +BtPage perm; + + if( bt_writepage (mgr, page, latch->page_no) ) + return mgr->err; + + perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits)); + + if( msync (perm, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + + latch->dirty = 0; + return 0; +} + // read page into buffer pool from permanent location in Btree file BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no) { int flag = PROT_READ | PROT_WRITE; -uint segment = page_no >> 32; -unsigned char *perm; +uint segment = page_no >> 16; +BtPage perm; while( 1 ) { if( segment < mgr->segments ) { - perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); - memcpy (page, perm, mgr->page_size); -if( page->page_no != page_no ) + perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits)); +if( perm->page_no != page_no ) abort(); + memcpy (page, perm, mgr->page_size); mgr->reads++; return 0; } @@ -921,18 +976,18 @@ abort(); // write page to permanent location in Btree file // clear the dirty bit -BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit) +BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no) { int flag = PROT_READ | PROT_WRITE; -uint segment = page_no >> 32; -unsigned char *perm; +uint segment = page_no >> 16; +BtPage perm; while( 1 ) { if( segment < mgr->segments ) { - perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); + perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits)); +if( page_no > LEAF_page && perm->page_no != page_no) +abort(); memcpy (perm, page, mgr->page_size); - if( syncit ) - msync (perm, mgr->page_size, MS_SYNC); mgr->writes++; return 0; } @@ -966,7 +1021,8 @@ void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch) BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch) { -BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool); +uid entry = latch - mgr->latchsets; +BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool); return page; } @@ -1082,7 +1138,7 @@ trynext: // no read-lock is required since page is not pinned. if( latch->dirty ) - if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) ) + if( mgr->err = bt_writepage (mgr, page, latch->page_no) ) return mgr->line = __LINE__, NULL; else latch->dirty = 0; @@ -1091,7 +1147,7 @@ trynext: memcpy (page, contents, mgr->page_size); latch->dirty = 1; } else if( bt_readpage (mgr, page, page_no) ) - return mgr->line = __LINE__, NULL; + return mgr->line = __LINE__, NULL; // link page as head of hash table chain // if this is a never before used entry, @@ -1112,7 +1168,6 @@ trynext: latch->pin = CLOCK_bit | 1; latch->page_no = page_no; - latch->entry = entry; latch->split = 0; bt_releasemutex (latch->modify); @@ -1136,8 +1191,6 @@ uint slot; if( mgr->redoend ) { eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend); memset (eof, 0, sizeof(BtLogHdr)); - - pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits); } // write remaining dirty pool pages to the btree @@ -1147,24 +1200,19 @@ uint slot; latch = mgr->latchsets + slot; if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no, 0); + bt_writepage(mgr, page, latch->page_no); latch->dirty = 0, num++; } } - // flush last batch to disk and clear - // redo recovery buffer on disk. - - fdatasync (mgr->idx); + // clear redo recovery buffer on disk. - if( mgr->redopages ) { + if( mgr->pagezero->redopages ) { eof = (BtLogHdr *)mgr->redobuff; memset (eof, 0, sizeof(BtLogHdr)); eof->lsn = mgr->lsn; - - pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits); - - sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); + if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); } fprintf(stderr, "%d buffer pool pages flushed\n", num); @@ -1183,7 +1231,6 @@ uint slot; CloseHandle(mgr->hpool); #endif #ifdef unix - free (mgr->redobuff); close (mgr->idx); free (mgr); #else @@ -1303,17 +1350,18 @@ BtVal *val; memset (pagezero, 0, 1 << bits); pagezero->alloc->lvl = MIN_lvl - 1; pagezero->alloc->bits = mgr->page_bits; - bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1); + pagezero->redopages = redopages; + + bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1); pagezero->activepages = 2; // initialize left-most LEAF page in // alloc->left and count of active leaf pages. bt_putid (pagezero->alloc->left, LEAF_page); + ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits); - ftruncate (mgr->idx, REDO_page << mgr->page_bits); - - if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) { + if( bt_writepage (mgr, pagezero->alloc, 0) ) { fprintf (stderr, "Unable to create btree page zero\n"); return bt_mgrclose (mgr), NULL; } @@ -1340,7 +1388,7 @@ BtVal *val; pagezero->alloc->act = 1; pagezero->alloc->page_no = MIN_lvl - lvl; - if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) { + if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) { fprintf (stderr, "Unable to create btree page\n"); return bt_mgrclose (mgr), NULL; } @@ -1353,25 +1401,28 @@ mgrlatch: VirtualFree (pagezero, 0, MEM_RELEASE); #endif #ifdef unix - // mlock the pagezero page + // mlock the first segment of 64K pages flag = PROT_READ | PROT_WRITE; - mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits); - if( mgr->pagezero == MAP_FAILED ) { - fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno); + mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0); + mgr->segments = 1; + + if( mgr->pages[0] == MAP_FAILED ) { + fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } + + mgr->pagezero = (BtPageZero *)mgr->pages[0]; mlock (mgr->pagezero, mgr->page_size); + mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size; + mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits); + mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0); if( mgr->pagepool == MAP_FAILED ) { fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } - if( mgr->redopages = redopages ) { - ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits); - mgr->redobuff = valloc (redopages * mgr->page_size); - } #else flag = PAGE_READWRITE; mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL); @@ -1401,8 +1452,6 @@ mgrlatch: fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError()); return bt_mgrclose (mgr), NULL; } - if( mgr->redopages = redopages ) - mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE); #endif mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits)); @@ -1484,6 +1533,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no) WriteOLock (latch->atomic, thread_no); ReadLock (latch->readwr, thread_no); break; + case BtLockAtomic | BtLockWrite: + WriteOLock (latch->atomic, thread_no); + WriteLock (latch->readwr, thread_no); + break; } } @@ -1514,6 +1567,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) WriteORelease (latch->atomic); ReadRelease (latch->readwr); break; + case BtLockAtomic | BtLockWrite: + WriteORelease (latch->atomic); + WriteRelease (latch->readwr); + break; } } @@ -1530,7 +1587,7 @@ int blk; bt_mutexlock(mgr->lock); // use empty chain first - // else allocate empty page + // else allocate new page if( page_no = bt_getid(mgr->pagezero->freechain) ) { if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) ) @@ -1538,13 +1595,24 @@ int blk; else return mgr->line = __LINE__, mgr->err = BTERR_struct; - bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); mgr->pagezero->activepages++; + bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); - bt_releasemutex(mgr->lock); + // the page is currently free and this + // will keep bt_txnpromote out. - memcpy (set->page, contents, mgr->page_size); + // contents will replace this bit + // and pin will keep bt_txnpromote out + + contents->page_no = page_no; set->latch->dirty = 1; + + memcpy (set->page, contents, mgr->page_size); + + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); + + bt_releasemutex(mgr->lock); return 0; } @@ -1555,11 +1623,16 @@ int blk; // extend file into new page. mgr->pagezero->activepages++; - contents->page_no = page_no; - - pwrite (mgr->idx, contents, mgr->page_size, (uid)(page_no + 1) << mgr->page_bits); + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); bt_releasemutex(mgr->lock); + // keep bt_txnpromote out of this page + + contents->free = 1; + contents->page_no = page_no; + pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits); + // don't load cache from btree page, load it from contents if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) ) @@ -1567,6 +1640,9 @@ int blk; else return mgr->err; + // now pin will keep bt_txnpromote out + + set->page->free = 0; return 0; } @@ -1608,10 +1684,11 @@ uint good = 0; int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no) { -uid page_no = ROOT_page, prevpage = 0; +uid page_no = ROOT_page, prevpage_no = 0; uint drill = 0xff, slot; BtLatchSet *prevlatch; uint mode, prevmode; +BtPage prevpage; BtVal *val; // start at root of btree and drill down @@ -1629,13 +1706,15 @@ BtVal *val; bt_lockpage(BtLockAccess, set->latch, thread_no); set->page = bt_mappage (mgr, set->latch); +if( set->latch->promote ) +abort(); // release & unpin parent or left sibling page - if( prevpage ) { + if( prevpage_no ) { bt_unlockpage(prevmode, prevlatch); bt_unpinlatch (mgr, prevlatch); - prevpage = 0; + prevpage_no = 0; } // obtain mode lock using lock chaining through AccessLock @@ -1663,8 +1742,9 @@ BtVal *val; } } - prevpage = set->latch->page_no; + prevpage_no = set->latch->page_no; prevlatch = set->latch; + prevpage = set->page; prevmode = mode; // find key on page at this level @@ -1707,6 +1787,7 @@ BtVal *val; // return page to free list // page must be delete & write locked +// and have no keys pointing to it. void bt_freepage (BtMgr *mgr, BtPageSet *set) { @@ -1718,20 +1799,24 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set) memcpy(set->page->right, mgr->pagezero->freechain, BtId); bt_putid(mgr->pagezero->freechain, set->latch->page_no); + set->latch->promote = 0; set->latch->dirty = 1; set->page->free = 1; // decrement active page count - // and unlock allocation page mgr->pagezero->activepages--; - bt_releasemutex (mgr->lock); + + if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 ) + fprintf(stderr, "msync error %d line %d\n", errno, __LINE__); // unlock released page + // and unlock allocation page bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); + bt_releasemutex (mgr->lock); } // a fence key was deleted from a page @@ -1826,7 +1911,10 @@ uint idx; // delete a page and manage keys // call with page writelocked -BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) +// returns with page removed +// from the page pool. + +BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey) { unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray]; unsigned char value[BtId]; @@ -1863,6 +1951,7 @@ BtKey *ptr; // pull contents of right peer into our empty page memcpy (set->page, right->page, mgr->page_size); + set->page->page_no = set->latch->page_no; set->latch->dirty = 1; // mark right page deleted and point it to left page @@ -1891,7 +1980,8 @@ BtKey *ptr; ptr = (BtKey*)lowerfence; - if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) + if( delkey ) + if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) ) return mgr->err; // obtain delete and write locks to right node @@ -1902,6 +1992,7 @@ BtKey *ptr; bt_freepage (mgr, right); bt_unlockpage (BtLockParent, set->latch); + bt_unpinlatch (mgr, set->latch); return 0; } @@ -1956,9 +2047,12 @@ BtVal *val; break; } + if( !found ) + return 0; + // did we delete a fence key in an upper level? - if( found && lvl && set->page->act && fence ) + if( lvl && set->page->act && fence ) if( bt_fixfence (mgr, set, lvl, thread_no) ) return mgr->err; else @@ -1966,7 +2060,7 @@ BtVal *val; // do we need to collapse root? - if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 ) + if( set->latch->page_no == ROOT_page && set->page->act == 1 ) if( bt_collapseroot (mgr, set, thread_no) ) return mgr->err; else @@ -1974,14 +2068,11 @@ BtVal *val; // delete empty page - if( !set->page->act ) { - if( bt_deletepage (mgr, set, thread_no) ) - return mgr->err; - } else { - set->latch->dirty = 1; - bt_unlockpage(BtLockWrite, set->latch); - } + if( !set->page->act ) + return bt_deletepage (mgr, set, thread_no, 1); + set->latch->dirty = 1; + bt_unlockpage(BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); return 0; } @@ -2172,9 +2263,13 @@ uint nxt = mgr->page_size; unsigned char value[BtId]; BtPageSet left[1]; uid left_page_no; +BtPage frame; BtKey *ptr; BtVal *val; + frame = malloc (mgr->page_size); + memcpy (frame, root->page, mgr->page_size); + // save left page fence key for new root ptr = keyptr(root->page, root->page->cnt); @@ -2183,11 +2278,12 @@ BtVal *val; // Obtain an empty page to use, and copy the current // root contents into it, e.g. lower keys - if( bt_newpage(mgr, left, root->page, page_no) ) + if( bt_newpage(mgr, left, frame, page_no) ) return mgr->err; left_page_no = left->latch->page_no; bt_unpinlatch (mgr, left->latch); + free (frame); // preserve the page info at the bottom // of higher keys and set rest to zero @@ -2243,7 +2339,7 @@ BtVal *val; // split already locked full node // leave it locked. // return pool entry for new right -// page, unlocked +// page, pinned & unlocked uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { @@ -2352,13 +2448,14 @@ uint prev; bt_putid(set->page->right, right->latch->page_no); set->page->min = nxt; set->page->cnt = idx; + free(frame); - return right->latch->entry; + return right->latch - mgr->latchsets; } // fix keys for newly split page -// call with page locked, return -// unlocked +// call with both pages pinned & locked +// return unlocked and unpinned BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no) { @@ -2422,6 +2519,7 @@ uint idx, librarian; BtSlot *node; BtKey *ptr; BtVal *val; +int rate; // if found slot > desired slot and previous slot // is a librarian slot, use it @@ -2450,27 +2548,52 @@ BtVal *val; if( slotptr(set->page, idx)->dead ) break; - // now insert key into array before slot + // now insert key into array before slot, + // adding as many librarian slots as + // makes sense. - if( idx == set->page->cnt ) - idx += 2, set->page->cnt += 2, librarian = 2; - else - librarian = 1; + if( idx == set->page->cnt ) { + int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot); - set->latch->dirty = 1; - set->page->act++; + librarian = ++idx - slot; + avail /= sizeof(BtSlot); - while( idx > slot + librarian - 1 ) - *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--; + if( avail < 0 ) + avail = 0; - // add librarian slot + if( librarian > avail ) + librarian = avail; - if( librarian > 1 ) { - node = slotptr(set->page, slot++); - node->off = set->page->min; - node->type = Librarian; - node->dead = 1; + if( librarian ) { + rate = (idx - slot) / librarian; + set->page->cnt += librarian; + idx += librarian; + } else + rate = 0; + } else + librarian = 0, rate = 0; + + while( idx > slot ) { + // transfer slot + *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1); + idx--; + + // add librarian slot per rate + + if( librarian ) + if( (idx - slot + 1)/2 <= librarian * rate ) { +// if( rate && !(idx % rate) ) { + node = slotptr(set->page, idx--); + node->off = node[1].off; + node->type = Librarian; + node->dead = 1; + librarian--; + } } +if(librarian) +abort(); + set->latch->dirty = 1; + set->page->act++; // fill in new slot @@ -2594,23 +2717,6 @@ BtVal *val; return 0; } -typedef struct { - logseqno reqlsn; // redo log seq no required - logseqno lsn; // redo log sequence number - uint entry; // latch table entry number - uint slot:31; // page slot number - uint reuse:1; // reused previous page -} AtomicTxn; - -typedef struct { - uid page_no; // page number for split leaf - void *next; // next key to insert - uint entry:29; // latch table entry number - uint type:2; // 0 == insert, 1 == delete, 2 == free - uint nounlock:1; // don't unlock ParentModification - unsigned char leafkey[BT_keyarray]; -} AtomicKey; - // determine actual page where key is located // return slot number @@ -2653,7 +2759,7 @@ uint entry; return 0; } -BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no) +BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn) { BtKey *key = keyptr(source, src); BtVal *val = valptr(source, src); @@ -2665,17 +2771,19 @@ uint entry, slot; if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) { if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) ) return mgr->err; - set->page->lsn = locks[src].lsn; + set->page->lsn = lsn; return 0; } + // split page + if( entry = bt_splitpage (mgr, set, thread_no) ) latch = mgr->latchsets + entry; else return mgr->err; // splice right page into split chain - // and WriteLock it. + // and WriteLock it bt_lockpage(BtLockWrite, latch, thread_no); latch->split = set->latch->split; @@ -2689,7 +2797,7 @@ uint entry, slot; // perform delete from smaller btree // insert a delete slot if not found there -BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no) +BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn) { BtKey *key = keyptr(source, src); BtPageSet set[1]; @@ -2718,131 +2826,35 @@ BtVal *val; set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->latch->dirty = 1; - set->page->lsn = locks[src].lsn; + set->page->lsn = lsn; set->page->act--; node->dead = 0; - mgr->found++; + __sync_fetch_and_add(&mgr->found, 1); return 0; } -// delete an empty master page for a transaction - -// note that the far right page never empties because -// it always contains (at least) the infinite stopper key -// and that all pages that don't contain any keys are -// deleted, or are being held under Atomic lock. - -BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no) +int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page) { -BtPageSet right[1], temp[1]; -unsigned char value[BtId]; -uid right_page_no; -BtKey *ptr; - - bt_lockpage(BtLockWrite, prev->latch, thread_no); - - // grab the right sibling - - if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) ) - right->page = bt_mappage (mgr, right->latch); - else - return mgr->err; - - bt_lockpage(BtLockAtomic, right->latch, thread_no); - bt_lockpage(BtLockWrite, right->latch, thread_no); +BtKey *key1 = (BtKey *)((char *)page + slot1->off); +BtKey *key2 = (BtKey *)((char *)page + slot2->off); - // and pull contents over empty page - // while preserving master's left link - - memcpy (right->page->left, prev->page->left, BtId); - memcpy (prev->page, right->page, mgr->page_size); - - // forward seekers to old right sibling - // to new page location in set - - bt_putid (right->page->right, prev->latch->page_no); - right->latch->dirty = 1; - right->page->kill = 1; - - // remove pointer to right page for searchers by - // changing right fence key to point to the master page - - ptr = keyptr(right->page,right->page->cnt); - bt_putid (value, prev->latch->page_no); - - if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) - return mgr->err; - - // now that master page is in good shape we can - // remove its locks. - - bt_unlockpage (BtLockAtomic, prev->latch); - bt_unlockpage (BtLockWrite, prev->latch); - - // fix master's right sibling's left pointer - // to remove scanner's poiner to the right page - - if( right_page_no = bt_getid (prev->page->right) ) { - if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) ) - temp->page = bt_mappage (mgr, temp->latch); - else - return mgr->err; - - bt_lockpage (BtLockWrite, temp->latch, thread_no); - bt_putid (temp->page->left, prev->latch->page_no); - temp->latch->dirty = 1; - - bt_unlockpage (BtLockWrite, temp->latch); - bt_unpinlatch (mgr, temp->latch); - } else { // master is now the far right page - bt_mutexlock (mgr->lock); - bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no); - bt_releasemutex(mgr->lock); - } - - // now that there are no pointers to the right page - // we can delete it after the last read access occurs - - bt_unlockpage (BtLockWrite, right->latch); - bt_unlockpage (BtLockAtomic, right->latch); - bt_lockpage (BtLockDelete, right->latch, thread_no); - bt_lockpage (BtLockWrite, right->latch, thread_no); - bt_freepage (mgr, right); - return 0; + return keycmp (key1, key2->key, key2->len); } - // atomic modification of a batch of keys. -// return -1 if BTERR is set -// otherwise return slot number -// causing the key constraint violation -// or zero on successful completion. - BTERR bt_atomictxn (BtDb *bt, BtPage source) { uint src, idx, slot, samepage, entry, que = 0; -AtomicKey *head, *tail, *leaf; -BtPageSet set[1], prev[1]; -unsigned char value[BtId]; BtKey *key, *ptr, *key2; -BtLatchSet *latch; -AtomicTxn *locks; int result = 0; BtSlot temp[1]; logseqno lsn; -BtPage page; -BtVal *val; -uid right; int type; - locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); - head = NULL; - tail = NULL; - // stable sort the list of keys into order to // prevent deadlocks between threads. - +/* for( src = 1; src++ < source->cnt; ) { *temp = *slotptr(source,src); key = keyptr (source,src); @@ -2856,15 +2868,47 @@ int type; break; } } - +*/ + qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source); // add entries to redo log - if( bt->mgr->redopages ) - if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) ) - for( src = 0; src++ < source->cnt; ) - locks[src].lsn = lsn; - else - return bt->mgr->err; + if( bt->mgr->pagezero->redopages ) + lsn = bt_txnredo (bt->mgr, source, bt->thread_no); + else + lsn = 0; + + // perform the individual actions in the transaction + + if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) ) + return bt->mgr->err; + + // if number of active pages + // is greater than the buffer pool + // promote page into larger btree + + if( bt->main ) + while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) + if( bt_txnpromote (bt) ) + return bt->mgr->err; + + // return success + + return 0; +} + +BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no) +{ +uint src, idx, slot, samepage, entry, que = 0; +BtPageSet set[1], prev[1]; +unsigned char value[BtId]; +BtLatchSet *latch; +AtomicTxn *locks; +BtKey *key, *ptr; +BtPage page; +BtVal *val; +uid right; + + locks = calloc (source->cnt + 1, sizeof(AtomicTxn)); // Load the leaf page for each key // group same page references with reuse bit @@ -2881,14 +2925,12 @@ int type; if( samepage = src > 1 ) if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); - else - bt_unlockpage(BtLockRead, set->latch); if( !slot ) - if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) ) + if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) ) set->latch->split = 0; else - return bt->mgr->err; + return mgr->err; if( slotptr(set->page, slot)->type == Librarian ) ptr = keyptr(set->page, ++slot); @@ -2896,7 +2938,7 @@ int type; ptr = keyptr(set->page, slot); if( !samepage ) { - locks[src].entry = set->latch->entry; + locks[src].entry = set->latch - mgr->latchsets; locks[src].slot = slot; locks[src].reuse = 0; } else { @@ -2910,29 +2952,18 @@ int type; locks[src].reqlsn = set->page->lsn; } - // unlock last loadpage lock - - if( source->cnt ) - bt_unlockpage(BtLockRead, set->latch); - // obtain write lock for each master page - // sync flushed pages to disk for( src = 0; src++ < source->cnt; ) { if( locks[src].reuse ) continue; - set->latch = bt->mgr->latchsets + locks[src].entry; - bt_lockpage (BtLockWrite, set->latch, bt->thread_no); + set->latch = mgr->latchsets + locks[src].entry; + bt_lockpage (BtLockWrite, set->latch, thread_no); } // insert or delete each key // process any splits or merges - // release Write & Atomic latches - // set ParentModifications and build - // queue of keys to insert for split pages - // or delete for deleted pages. - // run through txn list backwards samepage = source->cnt + 1; @@ -2948,227 +2979,158 @@ int type; for( idx = src; idx < samepage; idx++ ) switch( slotptr(source,idx)->type ) { case Delete: - if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) ) - return bt->mgr->err; + if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) ) + return mgr->err; break; case Duplicate: case Unique: - if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) ) - return bt->mgr->err; + if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) ) + return mgr->err; + break; + + default: + bt_atomicpage (mgr, source, locks, idx, set); break; } // after the same page operations have finished, // process master page for splits or deletion. - latch = prev->latch = bt->mgr->latchsets + locks[src].entry; - prev->page = bt_mappage (bt->mgr, prev->latch); + latch = prev->latch = mgr->latchsets + locks[src].entry; + prev->page = bt_mappage (mgr, prev->latch); samepage = src; // pick-up all splits from master page - // each one is already WriteLocked. - - entry = prev->latch->split; + // each one is already pinned & WriteLocked. - while( entry ) { - set->latch = bt->mgr->latchsets + entry; - set->page = bt_mappage (bt->mgr, set->latch); - entry = set->latch->split; + if( entry = prev->latch->split ) do { + set->latch = mgr->latchsets + entry; + set->page = bt_mappage (mgr, set->latch); // delete empty master page by undoing its split // (this is potentially another empty page) - // note that there are no new left pointers yet + // note that there are no pointers to it yet if( !prev->page->act ) { memcpy (set->page->left, prev->page->left, BtId); - memcpy (prev->page, set->page, bt->mgr->page_size); - bt_lockpage (BtLockDelete, set->latch, bt->thread_no); - bt_freepage (bt->mgr, set); - + memcpy (prev->page, set->page, mgr->page_size); + bt_lockpage (BtLockDelete, set->latch, thread_no); prev->latch->split = set->latch->split; prev->latch->dirty = 1; + bt_freepage (mgr, set); continue; } - // remove empty page from the split chain - // and return it to the free list. + // remove empty split page from the split chain + // and return it to the free list. No other + // thread has its page number yet. if( !set->page->act ) { memcpy (prev->page->right, set->page->right, BtId); prev->latch->split = set->latch->split; - bt_lockpage (BtLockDelete, set->latch, bt->thread_no); - bt_freepage (bt->mgr, set); + + bt_lockpage (BtLockDelete, set->latch, thread_no); + bt_freepage (mgr, set); continue; } - // schedule prev fence key update + // update prev's fence key ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1), que++; - - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; - leaf->type = 0; + bt_putid (value, prev->latch->page_no); - if( tail ) - tail->next = leaf; - else - head = leaf; - - tail = leaf; + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) + return mgr->err; // splice in the left link into the split page bt_putid (set->page->left, prev->latch->page_no); - bt_lockpage(BtLockParent, prev->latch, bt->thread_no); + + if( lsm ) + bt_syncpage (mgr, prev->page, prev->latch); + + // page is unpinned below to avoid bt_txnpromote + bt_unlockpage(BtLockWrite, prev->latch); *prev = *set; - } + } while( entry = prev->latch->split ); // update left pointer in next right page from last split page - // (if all splits were reversed, latch->split == 0) + // (if all splits were reversed or none occurred, latch->split == 0) if( latch->split ) { // fix left pointer in master's original (now split) // far right sibling or set rightmost page in page zero if( right = bt_getid (prev->page->right) ) { - if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) ) - set->page = bt_mappage (bt->mgr, set->latch); + if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) ) + set->page = bt_mappage (mgr, set->latch); else - return bt->mgr->err; + return mgr->err; - bt_lockpage (BtLockWrite, set->latch, bt->thread_no); + bt_lockpage (BtLockWrite, set->latch, thread_no); bt_putid (set->page->left, prev->latch->page_no); set->latch->dirty = 1; + bt_unlockpage (BtLockWrite, set->latch); - bt_unpinlatch (bt->mgr, set->latch); + bt_unpinlatch (mgr, set->latch); } else { // prev is rightmost page - bt_mutexlock (bt->mgr->lock); - bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); - bt_releasemutex(bt->mgr->lock); + bt_mutexlock (mgr->lock); + bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no); + bt_releasemutex(mgr->lock); } // Process last page split in chain + // by switching the key from the master + // page to the last split. ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1), que++; + bt_putid (value, prev->latch->page_no); - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; - leaf->type = 0; - - if( tail ) - tail->next = leaf; - else - head = leaf; - - tail = leaf; + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) + return mgr->err; - bt_lockpage(BtLockParent, prev->latch, bt->thread_no); bt_unlockpage(BtLockWrite, prev->latch); - // remove atomic lock on master page + if( lsm ) + bt_syncpage (mgr, prev->page, prev->latch); bt_unlockpage(BtLockAtomic, latch); + bt_unpinlatch(mgr, latch); + + // go through the list of splits and + // release the latch pins + + while( entry = latch->split ) { + latch = mgr->latchsets + entry; + bt_unpinlatch(mgr, latch); + } + continue; } - // finished if prev page occupied (either master or final split) + // since there are no splits, we're + // finished if master page occupied if( prev->page->act ) { - bt_unlockpage(BtLockWrite, latch); - bt_unlockpage(BtLockAtomic, latch); - bt_unpinlatch(bt->mgr, latch); + bt_unlockpage(BtLockAtomic, prev->latch); + bt_unlockpage(BtLockWrite, prev->latch); + + if( lsm ) + bt_syncpage (mgr, prev->page, prev->latch); + + bt_unpinlatch(mgr, prev->latch); continue; } // any and all splits were reversed, and the // master page located in prev is empty, delete it - // by pulling over master's right sibling. - - // Remove empty master's fence key - - ptr = keyptr(prev->page,prev->page->cnt); - - if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) - return bt->mgr->err; - - // perform the remainder of the delete - // from the FIFO queue - leaf = calloc (sizeof(AtomicKey), 1), que++; - - memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); - leaf->page_no = prev->latch->page_no; - leaf->entry = prev->latch->entry; - leaf->nounlock = 1; - leaf->type = 2; - - if( tail ) - tail->next = leaf; - else - head = leaf; - - tail = leaf; - - // leave atomic lock in place until - // deletion completes in next phase. - - bt_unlockpage(BtLockWrite, prev->latch); + if( bt_deletepage (mgr, prev, thread_no, 1) ) + return mgr->err; } - // add & delete keys for any pages split or merged during transaction - - if( leaf = head ) - do { - set->latch = bt->mgr->latchsets + leaf->entry; - set->page = bt_mappage (bt->mgr, set->latch); - - bt_putid (value, leaf->page_no); - ptr = (BtKey *)leaf->leafkey; - - switch( leaf->type ) { - case 0: // insert key - if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) ) - return bt->mgr->err; - - break; - - case 1: // delete key - if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) - return bt->mgr->err; - - break; - - case 2: // free page - if( bt_atomicfree (bt->mgr, set, bt->thread_no) ) - return bt->mgr->err; - - break; - } - - if( !leaf->nounlock ) - bt_unlockpage (BtLockParent, set->latch); - - bt_unpinlatch (bt->mgr, set->latch); - tail = leaf->next; - free (leaf); - } while( leaf = tail ); - - // if number of active pages - // is greater than the buffer pool - // promote page into larger btree - - while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) - if( bt_txnpromote (bt) ) - return bt->mgr->err; - - // return success - free (locks); return 0; } @@ -3195,21 +3157,14 @@ BtVal *val; continue; set->latch = bt->mgr->latchsets + entry; - idx = set->latch->page_no % bt->mgr->latchhash; if( !bt_mutextry(set->latch->modify) ) continue; -// if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) { -// bt_releasemutex(set->latch->modify); -// continue; -// } - // skip this entry if it is pinned if( set->latch->pin & ~CLOCK_bit ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); continue; } @@ -3219,70 +3174,51 @@ BtVal *val; if( !set->latch->page_no || !bt_getid (set->page->right) ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); continue; } - bt_lockpage (BtLockAccess, set->latch, bt->thread_no); - bt_lockpage (BtLockWrite, set->latch, bt->thread_no); - bt_unlockpage(BtLockAccess, set->latch); - - // entry interiour node or being or was killed + // entry interiour node or being killed or promoted if( set->page->lvl || set->page->free || set->page->kill ) { bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); - bt_unlockpage(BtLockWrite, set->latch); continue; } // pin the page for our useage set->latch->pin++; + set->latch->promote = 1; bt_releasemutex(set->latch->modify); -// bt_releasemutex(bt->mgr->hashtable[idx].latch); - - // if page is dirty, then - // sync it to the disk first. - if( set->latch->dirty ) - if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) ) - return bt->mgr->line = __LINE__, bt->mgr->err; - else - set->latch->dirty = 0; + bt_lockpage (BtLockWrite, set->latch, bt->thread_no); - // transfer slots in our selected page to larger btree -if( !(set->latch->page_no % 100) ) -fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt); + // remove the key for the page + // and wait for other threads to fade away - for( slot = 0; slot++ < set->page->cnt; ) { - ptr = keyptr (set->page, slot); - val = valptr (set->page, slot); - node = slotptr(set->page, slot); + ptr = keyptr(set->page, set->page->cnt); - switch( node->type ) { - case Duplicate: - case Unique: - if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) ) - return bt->main->err; + if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) + return bt->mgr->err; - continue; + bt_unlockpage (BtLockWrite, set->latch); +while( (set->latch->pin & ~CLOCK_bit) > 1 ) +sched_yield(); + bt_lockpage (BtLockDelete, set->latch, bt->thread_no); + bt_lockpage (BtLockAtomic, set->latch, bt->thread_no); + bt_lockpage (BtLockWrite, set->latch, bt->thread_no); - case Delete: - if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) ) - return bt->main->err; + // transfer slots in our selected page to larger btree +if( !(set->latch->page_no % 100) ) +fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act); - continue; - } - } + if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) + return bt->main->err; // now delete the page - if( bt_deletepage (bt->mgr, set, bt->thread_no) ) - return bt->mgr->err; - - bt_unpinlatch (bt->mgr, set->latch); - return 0; + bt_unlockpage (BtLockDelete, set->latch); + bt_unlockpage (BtLockAtomic, set->latch); + return bt_deletepage (bt->mgr, set, bt->thread_no, 0); } } @@ -3698,7 +3634,7 @@ FILE *in; posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); #endif fprintf(stderr, "started counting\n"); - next = LEAF_page + bt->mgr->redopages + 1; + next = REDO_page + bt->mgr->pagezero->redopages; while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { if( bt_readpage (bt->mgr, bt->cursor, page_no) ) @@ -3716,7 +3652,6 @@ FILE *in; break; } - fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found); bt_close (bt); #ifdef unix return NULL; @@ -3738,8 +3673,8 @@ pthread_t *threads; HANDLE *threads; #endif ThreadArg *args; +uint redopages = 0; uint poolsize = 0; -uint recovery = 0; uint mainpool = 0; uint mainbits = 0; float elapsed; @@ -3779,7 +3714,10 @@ BtKey *ptr; num = atoi(argv[6]); if( argc > 7 ) - recovery = atoi(argv[7]); + redopages = atoi(argv[7]); + + if( redopages + REDO_page > 65535 ) + fprintf (stderr, "Warning: Recovery buffer too large\n"); if( argc > 8 ) mainbits = atoi(argv[8]); @@ -3795,19 +3733,22 @@ BtKey *ptr; #endif args = malloc ((cnt + 1) * sizeof(ThreadArg)); - mgr = bt_mgr (argv[1], bits, poolsize, recovery); + mgr = bt_mgr (argv[1], bits, poolsize, redopages); if( !mgr ) { fprintf(stderr, "Index Open Error %s\n", argv[1]); exit (1); } - main = bt_mgr (argv[2], mainbits, mainpool, 0); + if( mainbits ) { + main = bt_mgr (argv[2], mainbits, mainpool, 0); - if( !main ) { - fprintf(stderr, "Index Open Error %s\n", argv[2]); - exit (1); - } + if( !main ) { + fprintf(stderr, "Index Open Error %s\n", argv[2]); + exit (1); + } + } else + main = NULL; // fire off threads @@ -3848,9 +3789,14 @@ BtKey *ptr; CloseHandle(threads[idx]); #endif bt_poolaudit(mgr); - bt_poolaudit(main); - bt_mgrclose (main); + if( main ) + bt_poolaudit(main); + + fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found); + + if( main ) + bt_mgrclose (main); bt_mgrclose (mgr); elapsed = getCpuTime(0) - start;