From 4fa440cc8ba094e8f043922450fe5e1e06e374cf Mon Sep 17 00:00:00 2001 From: unknown Date: Sun, 12 Oct 2014 08:05:10 -0700 Subject: [PATCH] Progress made on LSM B-tree --- threadskv10.c | 1293 ++++++++++++++++++++++++------------------------- 1 file changed, 637 insertions(+), 656 deletions(-) diff --git a/threadskv10.c b/threadskv10.c index c524a48..129434f 100644 --- a/threadskv10.c +++ b/threadskv10.c @@ -1,4 +1,4 @@ -// btree version threadskv10 FUTEX version +// btree version threadskv10 futex version // with reworked bt_deletekey code, // phase-fair reader writer lock, // librarian page split code, @@ -7,9 +7,9 @@ // traditional buffer pool manager // ACID batched key-value updates // redo log for failure recovery -// and dual B-trees for write optimization +// and LSM B-trees for write optimization -// 09 OCT 2014 +// 11 OCT 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -110,6 +110,41 @@ typedef enum{ BtLockAtomic = 32 } BtLock; +// lite weight spin latch + +volatile typedef struct { + ushort exclusive:1; + ushort pending:1; + ushort share:14; +} BtMutexLatch; + +#define XCL 1 +#define PEND 2 +#define BOTH 3 +#define SHARE 4 + +/* +// exclusive is set for write access + +volatile typedef struct { + unsigned char exclusive[1]; + unsigned char filler; +} BtMutexLatch; +*/ +/* +typedef struct { + union { + struct { + volatile uint xlock:1; // one writer has exclusive lock + volatile uint wrt:31; // count of other writers waiting + } bits[1]; + uint value[1]; + }; +} BtMutexLatch; +*/ +#define XCL 1 +#define WRT 2 + // definition for phase-fair reader/writer lock implementation typedef struct { @@ -117,16 +152,16 @@ typedef struct { volatile ushort rout[1]; volatile ushort ticket[1]; volatile ushort serving[1]; - ushort tid; - ushort dup; + volatile ushort tid[1]; + volatile ushort dup[1]; } RWLock; // write only lock typedef struct { - volatile uint exclusive[1]; - ushort tid; - ushort dup; + BtMutexLatch xcl[1]; + volatile ushort tid[1]; + volatile ushort dup[1]; } WOLock; #define PHID 0x1 @@ -134,16 +169,6 @@ typedef struct { #define MASK 0x3 #define RINC 0x4 -// lite weight mutex - -// exclusive is set for write access - -typedef struct { - volatile uint exclusive[1]; -} BtMutexLatch; - -#define XCL 1 - // mode & definition for lite latch implementation enum { @@ -172,7 +197,6 @@ typedef struct { uint prev; // prev entry in hash table chain ushort pin; // number of accessing threads unsigned char dirty; // page in cache is dirty (atomic set) - unsigned char avail; // page is an available entry BtMutexLatch modify[1]; // modify entry lite latch } BtLatchSet; @@ -203,8 +227,7 @@ typedef enum { Unique, Librarian, Duplicate, - Delete, - Update + Delete } BtSlotType; typedef struct { @@ -267,9 +290,10 @@ typedef struct { // structure for latch manager on ALLOC_page typedef struct { - struct BtPage_ alloc[1]; // next page_no in right ptr - unsigned long long dups[1]; // global duplicate key uniqueifier - unsigned char chain[BtId]; // head of free page_nos chain + struct BtPage_ alloc[1]; // next page_no in right ptr + unsigned long long dups[1]; // global duplicate key uniqueifier + unsigned char freechain[BtId]; // head of free page_nos chain + unsigned long long activepages; // number of active pages pages } BtPageZero; // The object structure for Btree access @@ -288,18 +312,17 @@ typedef struct { unsigned char *pagepool; // mapped to the buffer pool pages unsigned char *redobuff; // mapped recovery buffer pointer logseqno lsn, flushlsn; // current & first lsn flushed - BtMutexLatch dump[1]; // redo dump lite latch BtMutexLatch redo[1]; // redo area lite latch BtMutexLatch lock[1]; // allocation area lite latch + BtMutexLatch maps[1]; // mapping segments lite latch ushort thread_no[1]; // next thread number uint nlatchpage; // number of latch pages at BT_latch uint latchtotal; // number of page latch entries uint latchhash; // number of latch hash table slots uint latchvictim; // next latch entry to examine - uint latchavail; // next available latch entry - uint availlock[1]; // latch available commitments - uint available; // number of available latches + uint latchpromote; // next latch entry to promote uint redopages; // size of recovery buff in pages + uint redolast; // last msync size of recovery buff uint redoend; // eof/end element in recovery buff int err; // last error int line; // last error line no @@ -309,6 +332,8 @@ typedef struct { HANDLE halloc; // allocation handle HANDLE hpool; // buffer pool handle #endif + uint segments; // number of memory mapped segments + unsigned char *pages[64000];// memory mapped segments of b-tree } BtMgr; typedef struct { @@ -330,8 +355,7 @@ typedef enum { BTERR_read, BTERR_wrt, BTERR_atomic, - BTERR_recovery, - BTERR_avail + BTERR_recovery } BTERR; #define CLOCK_bit 0x8000 @@ -363,11 +387,11 @@ typedef struct { extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr, BtMgr *main); -extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no); +extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit); extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no); extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no); extern void bt_unlockpage(BtLock mode, BtLatchSet *latch); -extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update, ushort thread_no); +extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no); extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no); extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax); @@ -383,6 +407,9 @@ extern void bt_mgrclose (BtMgr *mgr); extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no); extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no); +// transaction functions +BTERR bt_txnpromote (BtDb *bt); + // The page is allocated from low and hi ends. // The key slots are allocated from the bottom, // while the text and value of the key @@ -461,47 +488,170 @@ uid bt_newdup (BtMgr *mgr) #endif } -// Write-Only Queue Lock +// lite weight spin lock Latch Manager -void WriteOLock (WOLock *lock, ushort tid) +int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) { -uint prev; + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +} + +void bt_mutexlock(BtMutexLatch *latch) +{ +ushort prev; - if( lock->tid == tid ) { - lock->dup++; + do { +#ifdef unix + prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL); +#else + prev = _InterlockedOr16((ushort *)latch, PEND | XCL); +#endif + if( !(prev & XCL) ) + if( !(prev & ~BOTH) ) + return; + else +#ifdef unix + __sync_fetch_and_and ((ushort *)latch, ~XCL); +#else + _InterlockedAnd16((ushort *)latch, ~XCL); +#endif +#ifdef unix + } while( sched_yield(), 1 ); +#else + } while( SwitchToThread(), 1 ); +#endif +/* +unsigned char prev; + + do { +#ifdef unix + prev = __sync_fetch_and_or(latch->exclusive, XCL); +#else + prev = _InterlockedOr8(latch->exclusive, XCL); +#endif + if( !(prev & XCL) ) + return; +#ifdef unix + } while( sched_yield(), 1 ); +#else + } while( SwitchToThread(), 1 ); +#endif +*/ +/* +BtMutexLatch prev[1]; +uint slept = 0; + + while( 1 ) { + *prev->value = __sync_fetch_and_or(latch->value, XCL); + + if( !prev->bits->xlock ) { // did we set XCL bit? + if( slept ) + __sync_fetch_and_sub(latch->value, WRT); return; } - while( 1 ) { + if( !slept ) { + prev->bits->wrt++; + __sync_fetch_and_add(latch->value, WRT); + } + + sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr); + slept = 1; + } */ +} + +// try to obtain write lock + +// return 1 if obtained, +// 0 otherwise + +int bt_mutextry(BtMutexLatch *latch) +{ +ushort prev; + +#ifdef unix + prev = __sync_fetch_and_or((ushort *)latch, XCL); +#else + prev = _InterlockedOr16((ushort *)latch, XCL); +#endif + // take write access if all bits are clear + + if( !(prev & XCL) ) + if( !(prev & ~BOTH) ) + return 1; + else #ifdef unix - prev = __sync_fetch_and_or (lock->exclusive, 1); + __sync_fetch_and_and ((ushort *)latch, ~XCL); #else - prev = _InterlockedExchangeOr (lock->exclusive, 1); + _InterlockedAnd16((ushort *)latch, ~XCL); #endif - if( !(prev & XCL) ) { - lock->tid = tid; - return; - } + return 0; +/* +unsigned char prev; + +#ifdef unix + prev = __sync_fetch_and_or(latch->exclusive, XCL); +#else + prev = _InterlockedOr8(latch->exclusive, XCL); +#endif + // take write access if all bits are clear + + return !(prev & XCL); +*/ +/* +BtMutexLatch prev[1]; + + *prev->value = __sync_fetch_and_or(latch->value, XCL); + + // take write access if exclusive bit is clear + + return !prev->bits->xlock; +*/ +} + +// clear write mode + +void bt_releasemutex(BtMutexLatch *latch) +{ #ifdef unix - sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr ); + __sync_fetch_and_and((ushort *)latch, ~BOTH); #else - SwitchToThread (); + _InterlockedAnd16((ushort *)latch, ~BOTH); #endif +/* + *latch->exclusive = 0; +*/ +/* +BtMutexLatch prev[1]; + + *prev->value = __sync_fetch_and_and(latch->value, ~XCL); + + if( prev->bits->wrt ) + sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); +*/ +} + +// Write-Only Queue Lock + +void WriteOLock (WOLock *lock, ushort tid) +{ + if( *lock->tid == tid ) { + *lock->dup += 1; + return; } + + bt_mutexlock(lock->xcl); + *lock->tid = tid; } void WriteORelease (WOLock *lock) { - if( lock->dup ) { - lock->dup--; + if( *lock->dup ) { + *lock->dup -= 1; return; } - *lock->exclusive = 0; - lock->tid = 0; -#ifdef linux - sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); -#endif + *lock->tid = 0; + bt_releasemutex(lock->xcl); } // Phase-Fair reader/writer lock implementation @@ -510,8 +660,8 @@ void WriteLock (RWLock *lock, ushort tid) { ushort w, r, tix; - if( lock->tid == tid ) { - lock->dup++; + if( *lock->tid == tid ) { + *lock->dup += 1; return; } #ifdef unix @@ -540,17 +690,17 @@ ushort w, r, tix; #else SwitchToThread(); #endif - lock->tid = tid; + *lock->tid = tid; } void WriteRelease (RWLock *lock) { - if( lock->dup ) { - lock->dup--; + if( *lock->dup ) { + *lock->dup -= 1; return; } - lock->tid = 0; + *lock->tid = 0; #ifdef unix __sync_fetch_and_and (lock->rin, ~MASK); #else @@ -568,8 +718,8 @@ ushort w; // OK if write lock already held by same thread - if( lock->tid == tid ) { - lock->dup++; + if( *lock->tid == tid ) { + *lock->dup += 1; return 1; } #ifdef unix @@ -593,8 +743,9 @@ ushort w; void ReadLock (RWLock *lock, ushort tid) { ushort w; - if( lock->tid == tid ) { - lock->dup++; + + if( *lock->tid == tid ) { + *lock->dup += 1; return; } #ifdef unix @@ -613,8 +764,8 @@ ushort w; void ReadRelease (RWLock *lock) { - if( lock->dup ) { - lock->dup--; + if( *lock->dup ) { + *lock->dup -= 1; return; } @@ -625,62 +776,6 @@ void ReadRelease (RWLock *lock) #endif } -// lite weight spin lock Latch Manager - -int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) -{ - return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); -} - -void bt_mutexlock(BtMutexLatch *latch) -{ -uint prev; - - while( 1 ) { -#ifdef unix - prev = __sync_fetch_and_or(latch->exclusive, XCL); -#else - prev = _InterlockedOr(latch->exclusive, XCL); -#endif - if( !(prev & XCL) ) - return; -#ifdef unix - sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr ); -#else - SwitchToThread(); -#endif - } -} - -// try to obtain write lock - -// return 1 if obtained, -// 0 otherwise - -int bt_mutextry(BtMutexLatch *latch) -{ -uint prev; - -#ifdef unix - prev = __sync_fetch_and_or(latch->exclusive, XCL); -#else - prev = _InterlockedOr(latch->exclusive, XCL); -#endif - // take write access if exclusive bit is clear - - return !(prev & XCL); -} - -// clear write mode - -void bt_releasemutex(BtMutexLatch *latch) -{ - *latch->exclusive = 0; -#ifdef unix - sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); -#endif -} - // recovery manager -- flush dirty pages void bt_flushlsn (BtMgr *mgr, ushort thread_no) @@ -692,7 +787,7 @@ uint entry; // flush dirty pool pages to the btree -fprintf(stderr, "Start flushlsn\n"); +fprintf(stderr, "Start flushlsn "); for( entry = 1; entry < mgr->latchtotal; entry++ ) { page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool); latch = mgr->latchsets + entry; @@ -700,17 +795,18 @@ fprintf(stderr, "Start flushlsn\n"); bt_lockpage(BtLockRead, latch, thread_no); if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no); + bt_writepage(mgr, page, latch->page_no, 0); latch->dirty = 0, cnt++; } -if( latch->avail ) -cnt3++; if( latch->pin & ~CLOCK_bit ) cnt2++; bt_unlockpage(BtLockRead, latch); bt_releasemutex (latch->modify); } -fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3); +fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2); +fprintf(stderr, "begin sync"); + sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER); +fprintf(stderr, " end sync\n"); } // recovery manager -- process current recovery buff on startup @@ -746,36 +842,6 @@ BtVal *val; } } -// recovery manager -- dump current recovery buff & flush dirty pages -// in preparation for next recovery buffer. - -BTERR bt_dumpredo (BtMgr *mgr) -{ -BtLogHdr *eof; -fprintf(stderr, "Flush pages "); - - eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend); - memset (eof, 0, sizeof(BtLogHdr)); - - // flush pages written at beginning of this redo buffer - // then write the redo buffer out to disk - - fdatasync (mgr->idx); - -fprintf(stderr, "Dump ReDo: %d bytes\n", mgr->redoend); - pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits); - - sync_file_range (mgr->idx, REDO_page << mgr->page_bits, mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); - - mgr->flushlsn = mgr->lsn; - mgr->redoend = 0; - - eof = (BtLogHdr *)(mgr->redobuff); - memset (eof, 0, sizeof(BtLogHdr)); - eof->lsn = mgr->lsn; - return 0; -} - // recovery manager -- append new entry to recovery log // flush to disk when it overflows. @@ -784,7 +850,7 @@ logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ush uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); uint amt = sizeof(BtLogHdr); BtLogHdr *hdr, *eof; -uint flush; +uint last, end; bt_mutexlock (mgr->redo); @@ -794,11 +860,12 @@ uint flush; // see if new entry fits in buffer // flush and reset if it doesn't - if( flush = amt > size - mgr->redoend ) { - bt_mutexlock (mgr->dump); - - if( bt_dumpredo (mgr) ) - return 0; + if( amt > size - mgr->redoend ) { + mgr->flushlsn = mgr->lsn; + msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + mgr->redolast = 0; + mgr->redoend = 0; + bt_flushlsn(mgr, thread_no); } // fill in new entry & either eof or end block @@ -822,13 +889,17 @@ uint flush; memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal)); } - bt_releasemutex(mgr->redo); + eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend); + memset (eof, 0, sizeof(BtLogHdr)); + eof->lsn = mgr->lsn; - if( flush ) { - bt_flushlsn (mgr, thread_no); - bt_releasemutex(mgr->dump); - } + last = mgr->redolast & 0xfff; + end = mgr->redoend; + mgr->redolast = end; + bt_releasemutex(mgr->redo); + + msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); return hdr->lsn; } @@ -840,8 +911,8 @@ logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no) uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr); uint amt = 0, src, type; BtLogHdr *hdr, *eof; +uint last, end; logseqno lsn; -uint flush; BtKey *key; BtVal *val; @@ -859,11 +930,12 @@ BtVal *val; // see if new entry fits in buffer // flush and reset if it doesn't - if( flush = amt > size - mgr->redoend ) { - bt_mutexlock (mgr->dump); - - if( bt_dumpredo (mgr) ) - return 0; + if( amt > size - mgr->redoend ) { + mgr->flushlsn = mgr->lsn; + msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC); + mgr->redolast = 0; + mgr->redoend = 0; + bt_flushlsn (mgr, thread_no); } // assign new lsn to transaction @@ -886,9 +958,6 @@ BtVal *val; case Delete: type = BTRM_del; break; - case Update: - type = BTRM_upd; - break; } amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal); @@ -910,14 +979,15 @@ BtVal *val; eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend); memset (eof, 0, sizeof(BtLogHdr)); + eof->lsn = lsn; + last = mgr->redolast & 0xfff; + end = mgr->redoend; + mgr->redolast = end; bt_releasemutex(mgr->redo); - if( flush ) { - bt_flushlsn (mgr, thread_no); - bt_releasemutex(mgr->dump); - } - + msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC); + bt_releasemutex(mgr->redo); return lsn; } @@ -927,60 +997,82 @@ BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no) { off64_t off = page_no << mgr->page_bits; -#ifdef unix if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) { fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno); - return mgr->err = BTERR_read; - } -#else -OVERLAPPED ovl[1]; -uint amt[1]; - - memset (ovl, 0, sizeof(OVERLAPPED)); - ovl->Offset = off; - ovl->OffsetHigh = off >> 32; - - if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) { - fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError()); return BTERR_read; } - if( *amt < mgr->page_size ) { - fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError()); - return BTERR_read; - } -#endif +if( page->page_no != page_no ) +abort(); mgr->reads++; return 0; +/* +int flag = PROT_READ | PROT_WRITE; +uint segment = page_no >> 32; +unsigned char *perm; + + while( 1 ) { + if( segment < mgr->segments ) { + perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); + memcpy (page, perm, mgr->page_size); +if( page->page_no != page_no ) +abort(); + mgr->reads++; + return 0; + } + + bt_mutexlock (mgr->maps); + + if( segment < mgr->segments ) { + bt_releasemutex (mgr->maps); + continue; + } + + mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16)); + mgr->segments++; + + bt_releasemutex (mgr->maps); + } */ } // write page to permanent location in Btree file // clear the dirty bit -BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no) +BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit) { off64_t off = page_no << mgr->page_bits; - page->page_no = page_no; - -#ifdef unix - if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) + if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) { + fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno); return BTERR_wrt; -#else -OVERLAPPED ovl[1]; -uint amt[1]; + } + mgr->writes++; + return 0; +/* +int flag = PROT_READ | PROT_WRITE; +uint segment = page_no >> 32; +unsigned char *perm; - memset (ovl, 0, sizeof(OVERLAPPED)); - ovl->Offset = off; - ovl->OffsetHigh = off >> 32; + while( 1 ) { + if( segment < mgr->segments ) { + perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits); + memcpy (perm, page, mgr->page_size); + if( syncit ) + msync (perm, mgr->page_size, MS_SYNC); + mgr->writes++; + return 0; + } - if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) ) - return BTERR_wrt; + bt_mutexlock (mgr->maps); - if( *amt < mgr->page_size ) - return BTERR_wrt; -#endif - mgr->writes++; - return 0; + if( segment < mgr->segments ) { + bt_releasemutex (mgr->maps); + continue; + } + + mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16)); + bt_releasemutex (mgr->maps); + mgr->segments++; + } */ } // set CLOCK bit in latch @@ -992,26 +1084,10 @@ void bt_unpinlatch (BtMgr *mgr, BtLatchSet *latch) latch->pin |= CLOCK_bit; latch->pin--; - // if not doing redo recovery - // set latch available - - if( mgr->redopages ) - if( !(latch->pin & ~CLOCK_bit) ) { - latch->avail = 1; -#ifdef unix - __sync_fetch_and_add (&mgr->available, 1); -#else - _InterlockedIncrement(&mgr->available); -#endif - } - bt_releasemutex(latch->modify); } // return the btree cached page address -// if page is dirty and has not yet been -// flushed to disk for the current redo -// recovery buffer, write it out. BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch) { @@ -1023,24 +1099,16 @@ BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool); // return next available latch entry // and with latch entry locked -uint bt_availnext (BtMgr *mgr, ushort thread_id) +uint bt_availnext (BtMgr *mgr) { BtLatchSet *latch; uint entry; while( 1 ) { - - // flush dirty pages if none are available - // and we aren't doing redo recovery - - if( !mgr->redopages ) - if( !mgr->available ) - bt_flushlsn (mgr, thread_id); - #ifdef unix - entry = __sync_fetch_and_add (&mgr->latchavail, 1) + 1; + entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1; #else - entry = _InterlockedIncrement (&mgr->latchavail); + entry = _InterlockedIncrement (&mgr->latchvictim); #endif entry %= mgr->latchtotal; @@ -1049,24 +1117,26 @@ uint entry; latch = mgr->latchsets + entry; - if( !latch->avail ) + if( !bt_mutextry(latch->modify) ) continue; - bt_mutexlock(latch->modify); + // return this entry if it is not pinned - if( !latch->avail ) { - bt_releasemutex(latch->modify); - continue; - } + if( !latch->pin ) + return entry; + + // if the CLOCK bit is set + // reset it to zero. - return entry; + latch->pin &= ~CLOCK_bit; + bt_releasemutex(latch->modify); } } -// find available latchset +// pin page in buffer pool // return with latchset pinned -BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage loadit, ushort thread_id) +BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id) { uint hashidx = page_no % mgr->latchhash; BtLatchSet *latch; @@ -1085,34 +1155,25 @@ BtPage page; } while( entry = latch->next ); // found our entry: increment pin - // remove from available status if( entry ) { latch = mgr->latchsets + entry; bt_mutexlock(latch->modify); - if( latch->avail ) -#ifdef unix - __sync_fetch_and_add (&mgr->available, -1); -#else - _InterlockedDecrement(&mgr->available); -#endif - latch->avail = 0; latch->pin |= CLOCK_bit; latch->pin++; - +if(contents) +abort(); bt_releasemutex(latch->modify); bt_releasemutex(mgr->hashtable[hashidx].latch); return latch; } - // find and reuse entry from available set + // find and reuse unpinned entry trynext: - if( entry = bt_availnext (mgr, thread_id) ) - latch = mgr->latchsets + entry; - else - return mgr->line = __LINE__, mgr->err = BTERR_avail, NULL; + entry = bt_availnext (mgr); + latch = mgr->latchsets + entry; idx = latch->page_no % mgr->latchhash; @@ -1140,27 +1201,21 @@ trynext: bt_releasemutex (mgr->hashtable[idx].latch); } - // remove available status - - latch->avail = 0; -#ifdef unix - __sync_fetch_and_add (&mgr->available, -1); -#else - _InterlockedDecrement(&mgr->available); -#endif page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool); + // update permanent page area in btree from buffer pool + // no read-lock is required since page is not pinned. + if( latch->dirty ) - if( mgr->err = bt_writepage (mgr, page, latch->page_no) ) + if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) ) return mgr->line = __LINE__, NULL; else latch->dirty = 0; - if( loadit ) { - memcpy (page, loadit, mgr->page_size); + if( contents ) { + memcpy (page, contents, mgr->page_size); latch->dirty = 1; - } else - if( bt_readpage (mgr, page, page_no) ) + } else if( bt_readpage (mgr, page, page_no) ) return mgr->line = __LINE__, NULL; // link page as head of hash table chain @@ -1217,7 +1272,7 @@ uint slot; latch = mgr->latchsets + slot; if( latch->dirty ) { - bt_writepage(mgr, page, latch->page_no); + bt_writepage(mgr, page, latch->page_no, 0); latch->dirty = 0, num++; } } @@ -1240,6 +1295,9 @@ uint slot; fprintf(stderr, "%d buffer pool pages flushed\n", num); #ifdef unix + while( mgr->segments ) + munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits); + munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits); munmap (mgr->pagezero, mgr->page_size); #else @@ -1368,15 +1426,19 @@ BtVal *val; // and page(s) of latches and page pool cache memset (pagezero, 0, 1 << bits); + pagezero->alloc->lvl = MIN_lvl - 1; pagezero->alloc->bits = mgr->page_bits; bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1); + pagezero->activepages = 2; // initialize left-most LEAF page in - // alloc->left. + // alloc->left and count of active leaf pages. bt_putid (pagezero->alloc->left, LEAF_page); - if( bt_writepage (mgr, pagezero->alloc, 0) ) { + ftruncate (mgr->idx, REDO_page << mgr->page_bits); + + if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) { fprintf (stderr, "Unable to create btree page zero\n"); return bt_mgrclose (mgr), NULL; } @@ -1385,7 +1447,8 @@ BtVal *val; pagezero->alloc->bits = mgr->page_bits; for( lvl=MIN_lvl; lvl--; ) { - slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal)); + BtSlot *node = slotptr(pagezero->alloc, 1); + node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal)); key = keyptr(pagezero->alloc, 1); key->len = 2; // create stopper key key->key[0] = 0xff; @@ -1396,13 +1459,14 @@ BtVal *val; val->len = lvl ? BtId : 0; memcpy (val->value, value, val->len); - pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off; + pagezero->alloc->min = node->off; pagezero->alloc->lvl = lvl; pagezero->alloc->cnt = 1; pagezero->alloc->act = 1; + pagezero->alloc->page_no = MIN_lvl - lvl; - if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) { - fprintf (stderr, "Unable to create btree page zero\n"); + if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) { + fprintf (stderr, "Unable to create btree page\n"); return bt_mgrclose (mgr), NULL; } } @@ -1429,8 +1493,10 @@ mgrlatch: fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } - if( mgr->redopages = redopages ) + if( mgr->redopages = redopages ) { + ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits); mgr->redobuff = valloc (redopages * mgr->page_size); + } #else flag = PAGE_READWRITE; mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL); @@ -1468,14 +1534,6 @@ mgrlatch: mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal); mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry); - // mark all pool entries as available - - for( idx = 1; idx < mgr->latchtotal; idx++ ) { - latch = mgr->latchsets + idx; - latch->avail = 1; - mgr->available++; - } - return mgr; } @@ -1599,13 +1657,15 @@ int blk; // use empty chain first // else allocate empty page - if( page_no = bt_getid(mgr->pagezero->chain) ) { + if( page_no = bt_getid(mgr->pagezero->freechain) ) { if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) ) set->page = bt_mappage (mgr, set->latch); else - return mgr->err = BTERR_struct, mgr->line = __LINE__, -1; + return mgr->line = __LINE__, mgr->err = BTERR_struct; + + bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right)); + mgr->pagezero->activepages++; - bt_putid(mgr->pagezero->chain, bt_getid(set->page->right)); bt_releasemutex(mgr->lock); memcpy (set->page, contents, mgr->page_size); @@ -1619,16 +1679,20 @@ int blk; // unlock allocation latch and // extend file into new page. + mgr->pagezero->activepages++; + + ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits); bt_releasemutex(mgr->lock); - // don't load cache from btree page + // don't load cache from btree page, load it from contents + + contents->page_no = page_no; if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) ) set->page = bt_mappage (mgr, set->latch); else return mgr->err; - set->latch->dirty = 1; return 0; } @@ -1778,20 +1842,22 @@ void bt_freepage (BtMgr *mgr, BtPageSet *set) // store chain - memcpy(set->page->right, mgr->pagezero->chain, BtId); - bt_putid(mgr->pagezero->chain, set->latch->page_no); + memcpy(set->page->right, mgr->pagezero->freechain, BtId); + bt_putid(mgr->pagezero->freechain, set->latch->page_no); set->latch->dirty = 1; set->page->free = 1; + // decrement active page count + // and unlock allocation page + + mgr->pagezero->activepages--; + bt_releasemutex (mgr->lock); + // unlock released page bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); - - // unlock allocation page - - bt_releasemutex (mgr->lock); } // a fence key was deleted from a page @@ -1824,7 +1890,7 @@ uint idx; bt_putid (value, set->latch->page_no); ptr = (BtKey*)leftkey; - if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) ) + if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) ) return mgr->err; // now delete old fence key @@ -1885,7 +1951,6 @@ uint idx; // delete a page and manage keys // call with page writelocked -// returns with page unpinned BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no) { @@ -1897,7 +1962,7 @@ uid page_no; BtKey *ptr; // cache copy of fence key - // to post in parent + // to remove in parent ptr = keyptr(set->page, set->page->cnt); memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey)); @@ -1945,7 +2010,7 @@ BtKey *ptr; bt_putid (value, set->latch->page_no); ptr = (BtKey*)higherfence; - if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) ) + if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) ) return mgr->err; // delete old lower key to our node @@ -1963,7 +2028,6 @@ BtKey *ptr; bt_freepage (mgr, right); bt_unlockpage (BtLockParent, set->latch); - bt_unpinlatch (mgr, set->latch); return 0; } @@ -1974,33 +2038,43 @@ BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort t { uint slot, idx, found, fence; BtPageSet set[1]; +BtSlot *node; BtKey *ptr; BtVal *val; - if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) + if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) { + node = slotptr(set->page, slot); ptr = keyptr(set->page, slot); - else + } else return mgr->err; // if librarian slot, advance to real slot - if( slotptr(set->page, slot)->type == Librarian ) + if( node->type == Librarian ) { ptr = keyptr(set->page, ++slot); + node = slotptr(set->page, slot); + } fence = slot == set->page->cnt; - // if key is found delete it, otherwise ignore request + // delete the key, ignore request if already dead if( found = !keycmp (ptr, key, len) ) - if( found = slotptr(set->page, slot)->dead == 0 ) { + if( found = node->dead == 0 ) { val = valptr(set->page,slot); - slotptr(set->page, slot)->dead = 1; set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->page->act--; + // mark node type as delete + + node->type = Delete; + node->dead = 1; + // collapse empty slots beneath the fence + // on interiour nodes - while( idx = set->page->cnt - 1 ) + if( lvl ) + while( idx = set->page->cnt - 1 ) if( slotptr(set->page, idx)->dead ) { *slotptr(set->page, idx) = *slotptr(set->page, idx + 1); memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot)); @@ -2026,11 +2100,14 @@ BtVal *val; // delete empty page - if( !set->page->act ) - return bt_deletepage (mgr, set, thread_no); + if( !set->page->act ) { + if( bt_deletepage (mgr, set, thread_no) ) + return mgr->err; + } else { + set->latch->dirty = 1; + bt_unlockpage(BtLockWrite, set->latch); + } - set->latch->dirty = 1; - bt_unlockpage(BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); return 0; } @@ -2278,6 +2355,8 @@ BtVal *val; root->page->act = 2; root->page->lvl++; + mgr->pagezero->alloc->lvl = root->page->lvl; + // release and unpin root pages bt_unlockpage(BtLockWrite, root->latch); @@ -2440,7 +2519,7 @@ BtKey *ptr; bt_putid (value, set->latch->page_no); ptr = (BtKey *)leftkey; - if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) ) + if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) ) return mgr->err; // switch fence for right block of larger keys to new right page @@ -2448,7 +2527,7 @@ BtKey *ptr; bt_putid (value, right->page_no); ptr = (BtKey *)rightkey; - if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) ) + if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) ) return mgr->err; bt_unlockpage (BtLockParent, set->latch); @@ -2537,37 +2616,19 @@ BtVal *val; // Insert new key into the btree at given level. // either add a new key or update/add an existing one -BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique, ushort thread_no) +BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no) { -unsigned char newkey[BT_keyarray]; uint slot, idx, len, entry; BtPageSet set[1]; -BtKey *ptr, *ins; -uid sequence; +BtSlot *node; +BtKey *ptr; BtVal *val; -uint type; - - // set up the key we're working on - - ins = (BtKey*)newkey; - memcpy (ins->key, key, keylen); - ins->len = keylen; - - // is this a non-unique index value? - - if( unique ) - type = Unique; - else { - type = Duplicate; - sequence = bt_newdup (mgr); - bt_putid (ins->key + ins->len + sizeof(BtKey), sequence); - ins->len += BtId; - } while( 1 ) { // find the page and slot for the current key - if( slot = bt_loadpage (mgr, set, ins->key, ins->len, lvl, BtLockWrite, thread_no) ) + if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) { + node = slotptr(set->page, slot); ptr = keyptr(set->page, slot); - else { + } else { if( !mgr->err ) mgr->line = __LINE__, mgr->err = BTERR_ovflw; return mgr->err; @@ -2575,80 +2636,86 @@ uint type; // if librarian slot == found slot, advance to real slot - if( slotptr(set->page, slot)->type == Librarian ) - if( !keycmp (ptr, key, keylen) ) + if( node->type == Librarian ) + if( !keycmp (ptr, key, keylen) ) { ptr = keyptr(set->page, ++slot); + node = slotptr(set->page, slot); + } - len = ptr->len; - - if( slotptr(set->page, slot)->type == Duplicate ) - len -= BtId; - - // if inserting a duplicate key or unique key + // if inserting a duplicate key or unique + // key that doesn't exist on the page, // check for adequate space on the page // and insert the new key before slot. - if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) { - if( !(slot = bt_cleanpage (mgr, set, ins->len, slot, vallen)) ) - if( !(entry = bt_splitpage (mgr, set, thread_no)) ) - return mgr->err; - else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) - return mgr->err; - else - continue; - - return bt_insertslot (mgr, set, slot, ins->key, ins->len, value, vallen, type, 1); - } + switch( type ) { + case Unique: + case Duplicate: + if( keycmp (ptr, key, keylen) ) + if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) ) + return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1); + else if( !(entry = bt_splitpage (mgr, set, thread_no)) ) + return mgr->err; + else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) + return mgr->err; + else + continue; - // if key already exists, update value and return + // if key already exists, update value and return - val = valptr(set->page, slot); + val = valptr(set->page, slot); - if( val->len >= vallen ) { + if( val->len >= vallen ) { if( slotptr(set->page, slot)->dead ) set->page->act++; + node->type = type; + node->dead = 0; + set->page->garbage += val->len - vallen; set->latch->dirty = 1; - slotptr(set->page, slot)->dead = 0; val->len = vallen; memcpy (val->value, value, vallen); bt_unlockpage(BtLockWrite, set->latch); bt_unpinlatch (mgr, set->latch); return 0; - } + } - // new update value doesn't fit in existing value area + // new update value doesn't fit in existing value area + // make sure page has room - if( !slotptr(set->page, slot)->dead ) + if( !node->dead ) set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal); - else { - slotptr(set->page, slot)->dead = 0; + else set->page->act++; - } - if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) ) - if( !(entry = bt_splitpage (mgr, set, thread_no)) ) + node->type = type; + node->dead = 0; + + if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) ) + if( !(entry = bt_splitpage (mgr, set, thread_no)) ) return mgr->err; - else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) + else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) ) return mgr->err; - else + else continue; - set->page->min -= vallen + sizeof(BtVal); - val = (BtVal*)((unsigned char *)set->page + set->page->min); - memcpy (val->value, value, vallen); - val->len = vallen; + // copy key and value onto page and update slot - set->latch->dirty = 1; - set->page->min -= keylen + sizeof(BtKey); - ptr = (BtKey*)((unsigned char *)set->page + set->page->min); - memcpy (ptr->key, key, keylen); - ptr->len = keylen; + set->page->min -= vallen + sizeof(BtVal); + val = (BtVal*)((unsigned char *)set->page + set->page->min); + memcpy (val->value, value, vallen); + val->len = vallen; + + set->latch->dirty = 1; + set->page->min -= keylen + sizeof(BtKey); + ptr = (BtKey*)((unsigned char *)set->page + set->page->min); + memcpy (ptr->key, key, keylen); + ptr->len = keylen; - slotptr(set->page, slot)->off = set->page->min; - bt_unlockpage(BtLockWrite, set->latch); - bt_unpinlatch (mgr, set->latch); - return 0; + node->off = set->page->min; + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (mgr, set->latch); + return 0; + } } return 0; } @@ -2745,32 +2812,42 @@ uint entry, slot; return mgr->line = __LINE__, mgr->err = BTERR_atomic; } +// perform delete from smaller btree +// insert a delete slot if not found there + BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no) { BtKey *key = keyptr(source, src); BtPageSet set[1]; uint idx, slot; +BtSlot *node; BtKey *ptr; BtVal *val; - if( slot = bt_atomicpage (mgr, source, locks, src, set) ) + if( slot = bt_atomicpage (mgr, source, locks, src, set) ) { + node = slotptr(set->page, slot); ptr = keyptr(set->page, slot); - else + val = valptr(set->page, slot); + } else return mgr->line = __LINE__, mgr->err = BTERR_struct; - if( !keycmp (ptr, key->key, key->len) ) - if( !slotptr(set->page, slot)->dead ) - slotptr(set->page, slot)->dead = 1; - else - return 0; - else + // if slot is not found, insert a delete slot + + if( keycmp (ptr, key->key, key->len) ) + return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0); + + // if node is already dead, + // ignore the request. + + if( node->dead ) return 0; - val = valptr(set->page, slot); set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); set->latch->dirty = 1; set->page->lsn = locks[src].lsn; set->page->act--; + + node->dead = 0; mgr->found++; return 0; } @@ -2820,7 +2897,7 @@ BtKey *ptr; ptr = keyptr(right->page,right->page->cnt); bt_putid (value, prev->latch->page_no); - if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, 1, thread_no) ) + if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) ) return mgr->err; // now that master page is in good shape we can @@ -2835,6 +2912,8 @@ BtKey *ptr; if( right_page_no = bt_getid (prev->page->right) ) { if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) ) temp->page = bt_mappage (mgr, temp->latch); + else + return mgr->err; bt_lockpage (BtLockWrite, temp->latch, thread_no); bt_putid (temp->page->left, prev->latch->page_no); @@ -2859,181 +2938,6 @@ BtKey *ptr; return 0; } -// find and add the next available latch entry -// to the queue - -BTERR bt_txnavaillatch (BtDb *bt) -{ -BtLatchSet *latch; -uint startattempt; -uint cnt, entry; -uint hashidx; -BtPage page; - - // find and reuse previous entry on victim - - startattempt = bt->mgr->latchvictim; - - while( 1 ) { -#ifdef unix - entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1); -#else - entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1; -#endif - // skip entry if it has outstanding pins - - entry %= bt->mgr->latchtotal; - - if( !entry ) - continue; - - // only go around one time before - // flushing redo recovery buffer, - // and the buffer pool to free up entries. - - if( bt->mgr->redopages ) - if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) { - if( bt_mutextry (bt->mgr->dump) ) { - if( bt_dumpredo (bt->mgr) ) - return bt->mgr->err; - bt_flushlsn (bt->mgr, bt->thread_no); - // synchronize the various threads running into this condition - // so that only one thread does the dump and flush - } else - bt_mutexlock(bt->mgr->dump); - - startattempt = bt->mgr->latchvictim; - bt_releasemutex(bt->mgr->dump); - } - - latch = bt->mgr->latchsets + entry; - - if( latch->avail ) - continue; - - bt_mutexlock(latch->modify); - - // skip if already an available entry - - if( latch->avail ) { - bt_releasemutex(latch->modify); - continue; - } - - // skip this entry if it is pinned - // if the CLOCK bit is set - // reset it to zero. - - if( latch->pin ) { - latch->pin &= ~CLOCK_bit; - bt_releasemutex(latch->modify); - continue; - } - - page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool); - - // if dirty page has lsn >= last redo recovery buffer - // then hold page in the buffer pool until next redo - // recovery buffer is being written to disk. - - if( latch->dirty ) - if( page->lsn >= bt->mgr->flushlsn ) { - bt_releasemutex(latch->modify); - continue; - } - - // entry is now available -#ifdef unix - __sync_fetch_and_add (&bt->mgr->available, 1); -#else - _InterlockedIncrement(&bt->mgr->available); -#endif - latch->avail = 1; - bt_releasemutex(latch->modify); - return 0; - } -} - -// release available latch requests - -void bt_txnavailrelease (BtDb *bt, uint count) -{ -#ifdef unix - __sync_fetch_and_add(bt->mgr->availlock, -count); -#else - _InterlockedAdd(bt->mgr->availlock, -count); -#endif -} - -// promote page of keys from first btree -// into main btree - -BTERR bt_txnavailmain (BtDb *bt) -{ -BtLatchSet *latch; -uint entry; - - while( 1 ) { -#ifdef unix - entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1); -#else - entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1; -#endif - // skip entry if it has outstanding pins - - entry %= bt->mgr->latchtotal; - - if( !entry ) - continue; - - latch = bt->mgr->latchsets + entry; - - if( latch->avail ) - continue; - - bt_mutexlock(latch->modify); - - // skip if already an available entry - - if( latch->avail ) { - bt_releasemutex(latch->modify); - continue; - } - - // skip this entry if it is pinned - // if the CLOCK bit is set - // reset it to zero. - - if( latch->pin ) { - latch->pin &= ~CLOCK_bit; - bt_releasemutex(latch->modify); - continue; - } - - } -} - -// commit available pool entries -// find available entries as required - -BTERR bt_txnavailrequest (BtDb *bt, uint count) -{ -#ifdef unix - __sync_fetch_and_add(bt->mgr->availlock, count); -#else - _InterlockedAdd(bt->mgr->availlock, count); -#endif - - // find another available pool entry - - while( *bt->mgr->availlock > bt->mgr->available ) - if( bt->mgr->redopages ) - bt_txnavaillatch (bt); - else - if( bt_txnavailmain (bt) ) - return bt->mgr->err; -} - // atomic modification of a batch of keys. // return -1 if BTERR is set @@ -3041,9 +2945,9 @@ BTERR bt_txnavailrequest (BtDb *bt, uint count) // causing the key constraint violation // or zero on successful completion. -int bt_atomictxn (BtDb *bt, BtPage source) +BTERR bt_atomictxn (BtDb *bt, BtPage source) { -uint src, idx, slot, samepage, entry, avail, que = 0; +uint src, idx, slot, samepage, entry, que = 0; AtomicKey *head, *tail, *leaf; BtPageSet set[1], prev[1]; unsigned char value[BtId]; @@ -3079,12 +2983,14 @@ int type; } } - // reserve enough buffer pool entries - - avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1; + // add entries to redo log - if( bt_txnavailrequest (bt, avail) ) - return bt->mgr->err; + if( bt->mgr->redopages ) + if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) ) + for( src = 0; src++ < source->cnt; ) + locks[src].lsn = lsn; + else + return bt->mgr->err; // Load the leaf page for each key // group same page references with reuse bit @@ -3108,7 +3014,7 @@ int type; if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) ) set->latch->split = 0; else - goto atomicerr; + return bt->mgr->err; if( slotptr(set->page, slot)->type == Librarian ) ptr = keyptr(set->page, ++slot); @@ -3128,34 +3034,6 @@ int type; // capture current lsn for master page locks[src].reqlsn = set->page->lsn; - - // perform constraint checks - - switch( slotptr(source, src)->type ) { - case Duplicate: - case Unique: - if( !slotptr(set->page, slot)->dead ) - if( slot < set->page->cnt || bt_getid (set->page->right) ) - if( !keycmp (ptr, key->key, key->len) ) { - - // return constraint violation if key already exists - - bt_unlockpage(BtLockRead, set->latch); - result = src; - - while( src ) { - if( locks[src].entry ) { - set->latch = bt->mgr->latchsets + locks[src].entry; - bt_unlockpage(BtLockAtomic, set->latch); - bt_unpinlatch (bt->mgr, set->latch); - } - src--; - } - free (locks); - return result; - } - break; - } } // unlock last loadpage lock @@ -3163,22 +3041,15 @@ int type; if( source->cnt ) bt_unlockpage(BtLockRead, set->latch); - // and add entries to redo log - - if( bt->mgr->redopages ) - if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) ) - for( src = 0; src++ < source->cnt; ) - locks[src].lsn = lsn; - else - goto atomicerr; - // obtain write lock for each master page + // sync flushed pages to disk for( src = 0; src++ < source->cnt; ) { if( locks[src].reuse ) continue; - else - bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry, bt->thread_no); + + set->latch = bt->mgr->latchsets + locks[src].entry; + bt_lockpage (BtLockWrite, set->latch, bt->thread_no); } // insert or delete each key @@ -3204,13 +3075,13 @@ int type; switch( slotptr(source,idx)->type ) { case Delete: if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) ) - goto atomicerr; + return bt->mgr->err; break; case Duplicate: case Unique: if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) ) - goto atomicerr; + return bt->mgr->err; break; } @@ -3293,7 +3164,7 @@ int type; if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) ) set->page = bt_mappage (bt->mgr, set->latch); else - goto atomicerr; + return bt->mgr->err; bt_lockpage (BtLockWrite, set->latch, bt->thread_no); bt_putid (set->page->left, prev->latch->page_no); @@ -3350,7 +3221,7 @@ int type; ptr = keyptr(prev->page,prev->page->cnt); if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) - goto atomicerr; + return bt->mgr->err; // perform the remainder of the delete // from the FIFO queue @@ -3376,13 +3247,6 @@ int type; bt_unlockpage(BtLockWrite, prev->latch); } - bt_txnavailrelease (bt, avail); - - que *= bt->mgr->pagezero->alloc->lvl; - - if( bt_txnavailrequest (bt, que) ) - return bt->mgr->err; - // add & delete keys for any pages split or merged during transaction if( leaf = head ) @@ -3395,20 +3259,20 @@ int type; switch( leaf->type ) { case 0: // insert key - if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, 1, bt->thread_no) ) - goto atomicerr; + if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) ) + return bt->mgr->err; break; case 1: // delete key if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) ) - goto atomicerr; + return bt->mgr->err; break; case 2: // free page if( bt_atomicfree (bt->mgr, set, bt->thread_no) ) - goto atomicerr; + return bt->mgr->err; break; } @@ -3421,14 +3285,131 @@ int type; free (leaf); } while( leaf = tail ); - bt_txnavailrelease (bt, que); + // if number of active pages + // is greater than the buffer pool + // promote page into larger btree + + while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 ) + if( bt_txnpromote (bt) ) + return bt->mgr->err; // return success free (locks); return 0; -atomicerr: - return -1; +} + +// promote a page into the larger btree + +BTERR bt_txnpromote (BtDb *bt) +{ +uint entry, slot, idx; +BtPageSet set[1]; +BtSlot *node; +BtKey *ptr; +BtVal *val; + + while( 1 ) { +#ifdef unix + entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1); +#else + entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1; +#endif + entry %= bt->mgr->latchtotal; + + if( !entry ) + continue; + + set->latch = bt->mgr->latchsets + entry; + idx = set->latch->page_no % bt->mgr->latchhash; + + if( !bt_mutextry(set->latch->modify) ) + continue; + +// if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) { +// bt_releasemutex(set->latch->modify); +// continue; +// } + + // skip this entry if it is pinned + + if( set->latch->pin & ~CLOCK_bit ) { + bt_releasemutex(set->latch->modify); +// bt_releasemutex(bt->mgr->hashtable[idx].latch); + continue; + } + + set->page = bt_mappage (bt->mgr, set->latch); + + // entry never used or has no right sibling + + if( !set->latch->page_no || !bt_getid (set->page->right) ) { + bt_releasemutex(set->latch->modify); +// bt_releasemutex(bt->mgr->hashtable[idx].latch); + continue; + } + + bt_lockpage (BtLockAccess, set->latch, bt->thread_no); + bt_lockpage (BtLockWrite, set->latch, bt->thread_no); + bt_unlockpage(BtLockAccess, set->latch); + + // entry interiour node or being or was killed + + if( set->page->lvl || set->page->free || set->page->kill ) { + bt_releasemutex(set->latch->modify); +// bt_releasemutex(bt->mgr->hashtable[idx].latch); + bt_unlockpage(BtLockWrite, set->latch); + continue; + } + + // pin the page for our useage + + set->latch->pin++; + bt_releasemutex(set->latch->modify); +// bt_releasemutex(bt->mgr->hashtable[idx].latch); + + // if page is dirty, then + // sync it to the disk first. + + if( set->latch->dirty ) + if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) ) + return bt->mgr->line = __LINE__, bt->mgr->err; + else + set->latch->dirty = 0; + + // transfer slots in our selected page to larger btree +if( !(set->latch->page_no % 100) ) +fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt); + + for( slot = 0; slot++ < set->page->cnt; ) { + ptr = keyptr (set->page, slot); + val = valptr (set->page, slot); + node = slotptr(set->page, slot); + + switch( node->type ) { + case Duplicate: + case Unique: + if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) ) + return bt->main->err; + + continue; + + case Delete: + if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) ) + return bt->main->err; + + continue; + } + } + + // now delete the page + + if( bt_deletepage (bt->mgr, set, bt->thread_no) ) + return bt->mgr->err; + + bt_unpinlatch (bt->mgr, set->latch); + return 0; + } } // set cursor to highest slot on highest page @@ -3628,14 +3609,14 @@ uint entry = 0; if( *latch->access->rin & MASK ) fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no); - if( *latch->parent->exclusive ) - fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no); +// if( *latch->parent->xcl->value ) +// fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no); - if( *latch->atomic->exclusive ) - fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no); +// if( *latch->atomic->xcl->value ) +// fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no); - if( *latch->modify->exclusive ) - fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no); +// if( *latch->modify->value ) +// fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no); if( latch->pin & ~CLOCK_bit ) fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no); @@ -3709,7 +3690,7 @@ FILE *in; line++; if( !args->num ) { - if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, 1, bt->thread_no) ) + if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) ) fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0); len = 0; continue; @@ -3753,7 +3734,7 @@ FILE *in; { line++; - if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, 1, bt->thread_no) ) + if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) ) fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0); len = 0; } @@ -3808,7 +3789,6 @@ FILE *in; cnt++; } - set->latch->avail = 1; bt_unlockpage (BtLockRead, set->latch); bt_unpinlatch (bt->mgr, set->latch); } while( page_no = next ); @@ -3844,7 +3824,7 @@ FILE *in; posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); #endif fprintf(stderr, "started counting\n"); - page_no = LEAF_page; + next = LEAF_page + bt->mgr->redopages + 1; while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { if( bt_readpage (bt->mgr, bt->cursor, page_no) ) @@ -3853,7 +3833,8 @@ FILE *in; if( !bt->cursor->free && !bt->cursor->lvl ) cnt += bt->cursor->act; - page_no++; + bt->mgr->reads++; + page_no = next++; } cnt--; // remove stopper key -- 2.40.0