-// btree version threadskv10 FUTEX version
+// btree version threadskv10 futex version
// with reworked bt_deletekey code,
// phase-fair reader writer lock,
// librarian page split code,
// traditional buffer pool manager
// ACID batched key-value updates
// redo log for failure recovery
-// and dual B-trees for write optimization
+// and LSM B-trees for write optimization
-// 09 OCT 2014
+// 11 OCT 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
BtLockAtomic = 32
} BtLock;
+// lite weight spin latch
+
+volatile typedef struct {
+ ushort exclusive:1;
+ ushort pending:1;
+ ushort share:14;
+} BtMutexLatch;
+
+#define XCL 1
+#define PEND 2
+#define BOTH 3
+#define SHARE 4
+
+/*
+// exclusive is set for write access
+
+volatile typedef struct {
+ unsigned char exclusive[1];
+ unsigned char filler;
+} BtMutexLatch;
+*/
+/*
+typedef struct {
+ union {
+ struct {
+ volatile uint xlock:1; // one writer has exclusive lock
+ volatile uint wrt:31; // count of other writers waiting
+ } bits[1];
+ uint value[1];
+ };
+} BtMutexLatch;
+*/
+#define XCL 1
+#define WRT 2
+
// definition for phase-fair reader/writer lock implementation
typedef struct {
volatile ushort rout[1];
volatile ushort ticket[1];
volatile ushort serving[1];
- ushort tid;
- ushort dup;
+ volatile ushort tid[1];
+ volatile ushort dup[1];
} RWLock;
// write only lock
typedef struct {
- volatile uint exclusive[1];
- ushort tid;
- ushort dup;
+ BtMutexLatch xcl[1];
+ volatile ushort tid[1];
+ volatile ushort dup[1];
} WOLock;
#define PHID 0x1
#define MASK 0x3
#define RINC 0x4
-// lite weight mutex
-
-// exclusive is set for write access
-
-typedef struct {
- volatile uint exclusive[1];
-} BtMutexLatch;
-
-#define XCL 1
-
// mode & definition for lite latch implementation
enum {
uint prev; // prev entry in hash table chain
ushort pin; // number of accessing threads
unsigned char dirty; // page in cache is dirty (atomic set)
- unsigned char avail; // page is an available entry
BtMutexLatch modify[1]; // modify entry lite latch
} BtLatchSet;
Unique,
Librarian,
Duplicate,
- Delete,
- Update
+ Delete
} BtSlotType;
typedef struct {
// structure for latch manager on ALLOC_page
typedef struct {
- struct BtPage_ alloc[1]; // next page_no in right ptr
- unsigned long long dups[1]; // global duplicate key uniqueifier
- unsigned char chain[BtId]; // head of free page_nos chain
+ struct BtPage_ alloc[1]; // next page_no in right ptr
+ unsigned long long dups[1]; // global duplicate key uniqueifier
+ unsigned char freechain[BtId]; // head of free page_nos chain
+ unsigned long long activepages; // number of active pages pages
} BtPageZero;
// The object structure for Btree access
unsigned char *pagepool; // mapped to the buffer pool pages
unsigned char *redobuff; // mapped recovery buffer pointer
logseqno lsn, flushlsn; // current & first lsn flushed
- BtMutexLatch dump[1]; // redo dump lite latch
BtMutexLatch redo[1]; // redo area lite latch
BtMutexLatch lock[1]; // allocation area lite latch
+ BtMutexLatch maps[1]; // mapping segments lite latch
ushort thread_no[1]; // next thread number
uint nlatchpage; // number of latch pages at BT_latch
uint latchtotal; // number of page latch entries
uint latchhash; // number of latch hash table slots
uint latchvictim; // next latch entry to examine
- uint latchavail; // next available latch entry
- uint availlock[1]; // latch available commitments
- uint available; // number of available latches
+ uint latchpromote; // next latch entry to promote
uint redopages; // size of recovery buff in pages
+ uint redolast; // last msync size of recovery buff
uint redoend; // eof/end element in recovery buff
int err; // last error
int line; // last error line no
HANDLE halloc; // allocation handle
HANDLE hpool; // buffer pool handle
#endif
+ uint segments; // number of memory mapped segments
+ unsigned char *pages[64000];// memory mapped segments of b-tree
} BtMgr;
typedef struct {
BTERR_read,
BTERR_wrt,
BTERR_atomic,
- BTERR_recovery,
- BTERR_avail
+ BTERR_recovery
} BTERR;
#define CLOCK_bit 0x8000
extern void bt_close (BtDb *bt);
extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
-extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
-extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update, ushort thread_no);
+extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
extern BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
+// transaction functions
+BTERR bt_txnpromote (BtDb *bt);
+
// The page is allocated from low and hi ends.
// The key slots are allocated from the bottom,
// while the text and value of the key
#endif
}
-// Write-Only Queue Lock
+// lite weight spin lock Latch Manager
-void WriteOLock (WOLock *lock, ushort tid)
+int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
{
-uint prev;
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+
+void bt_mutexlock(BtMutexLatch *latch)
+{
+ushort prev;
- if( lock->tid == tid ) {
- lock->dup++;
+ do {
+#ifdef unix
+ prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
+#else
+ prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
+#endif
+ if( !(prev & XCL) )
+ if( !(prev & ~BOTH) )
+ return;
+ else
+#ifdef unix
+ __sync_fetch_and_and ((ushort *)latch, ~XCL);
+#else
+ _InterlockedAnd16((ushort *)latch, ~XCL);
+#endif
+#ifdef unix
+ } while( sched_yield(), 1 );
+#else
+ } while( SwitchToThread(), 1 );
+#endif
+/*
+unsigned char prev;
+
+ do {
+#ifdef unix
+ prev = __sync_fetch_and_or(latch->exclusive, XCL);
+#else
+ prev = _InterlockedOr8(latch->exclusive, XCL);
+#endif
+ if( !(prev & XCL) )
+ return;
+#ifdef unix
+ } while( sched_yield(), 1 );
+#else
+ } while( SwitchToThread(), 1 );
+#endif
+*/
+/*
+BtMutexLatch prev[1];
+uint slept = 0;
+
+ while( 1 ) {
+ *prev->value = __sync_fetch_and_or(latch->value, XCL);
+
+ if( !prev->bits->xlock ) { // did we set XCL bit?
+ if( slept )
+ __sync_fetch_and_sub(latch->value, WRT);
return;
}
- while( 1 ) {
+ if( !slept ) {
+ prev->bits->wrt++;
+ __sync_fetch_and_add(latch->value, WRT);
+ }
+
+ sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
+ slept = 1;
+ } */
+}
+
+// try to obtain write lock
+
+// return 1 if obtained,
+// 0 otherwise
+
+int bt_mutextry(BtMutexLatch *latch)
+{
+ushort prev;
+
+#ifdef unix
+ prev = __sync_fetch_and_or((ushort *)latch, XCL);
+#else
+ prev = _InterlockedOr16((ushort *)latch, XCL);
+#endif
+ // take write access if all bits are clear
+
+ if( !(prev & XCL) )
+ if( !(prev & ~BOTH) )
+ return 1;
+ else
#ifdef unix
- prev = __sync_fetch_and_or (lock->exclusive, 1);
+ __sync_fetch_and_and ((ushort *)latch, ~XCL);
#else
- prev = _InterlockedExchangeOr (lock->exclusive, 1);
+ _InterlockedAnd16((ushort *)latch, ~XCL);
#endif
- if( !(prev & XCL) ) {
- lock->tid = tid;
- return;
- }
+ return 0;
+/*
+unsigned char prev;
+
+#ifdef unix
+ prev = __sync_fetch_and_or(latch->exclusive, XCL);
+#else
+ prev = _InterlockedOr8(latch->exclusive, XCL);
+#endif
+ // take write access if all bits are clear
+
+ return !(prev & XCL);
+*/
+/*
+BtMutexLatch prev[1];
+
+ *prev->value = __sync_fetch_and_or(latch->value, XCL);
+
+ // take write access if exclusive bit is clear
+
+ return !prev->bits->xlock;
+*/
+}
+
+// clear write mode
+
+void bt_releasemutex(BtMutexLatch *latch)
+{
#ifdef unix
- sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
+ __sync_fetch_and_and((ushort *)latch, ~BOTH);
#else
- SwitchToThread ();
+ _InterlockedAnd16((ushort *)latch, ~BOTH);
#endif
+/*
+ *latch->exclusive = 0;
+*/
+/*
+BtMutexLatch prev[1];
+
+ *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
+
+ if( prev->bits->wrt )
+ sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+*/
+}
+
+// Write-Only Queue Lock
+
+void WriteOLock (WOLock *lock, ushort tid)
+{
+ if( *lock->tid == tid ) {
+ *lock->dup += 1;
+ return;
}
+
+ bt_mutexlock(lock->xcl);
+ *lock->tid = tid;
}
void WriteORelease (WOLock *lock)
{
- if( lock->dup ) {
- lock->dup--;
+ if( *lock->dup ) {
+ *lock->dup -= 1;
return;
}
- *lock->exclusive = 0;
- lock->tid = 0;
-#ifdef linux
- sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
+ *lock->tid = 0;
+ bt_releasemutex(lock->xcl);
}
// Phase-Fair reader/writer lock implementation
{
ushort w, r, tix;
- if( lock->tid == tid ) {
- lock->dup++;
+ if( *lock->tid == tid ) {
+ *lock->dup += 1;
return;
}
#ifdef unix
#else
SwitchToThread();
#endif
- lock->tid = tid;
+ *lock->tid = tid;
}
void WriteRelease (RWLock *lock)
{
- if( lock->dup ) {
- lock->dup--;
+ if( *lock->dup ) {
+ *lock->dup -= 1;
return;
}
- lock->tid = 0;
+ *lock->tid = 0;
#ifdef unix
__sync_fetch_and_and (lock->rin, ~MASK);
#else
// OK if write lock already held by same thread
- if( lock->tid == tid ) {
- lock->dup++;
+ if( *lock->tid == tid ) {
+ *lock->dup += 1;
return 1;
}
#ifdef unix
void ReadLock (RWLock *lock, ushort tid)
{
ushort w;
- if( lock->tid == tid ) {
- lock->dup++;
+
+ if( *lock->tid == tid ) {
+ *lock->dup += 1;
return;
}
#ifdef unix
void ReadRelease (RWLock *lock)
{
- if( lock->dup ) {
- lock->dup--;
+ if( *lock->dup ) {
+ *lock->dup -= 1;
return;
}
#endif
}
-// lite weight spin lock Latch Manager
-
-int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
-{
- return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
-}
-
-void bt_mutexlock(BtMutexLatch *latch)
-{
-uint prev;
-
- while( 1 ) {
-#ifdef unix
- prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
- prev = _InterlockedOr(latch->exclusive, XCL);
-#endif
- if( !(prev & XCL) )
- return;
-#ifdef unix
- sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
-#else
- SwitchToThread();
-#endif
- }
-}
-
-// try to obtain write lock
-
-// return 1 if obtained,
-// 0 otherwise
-
-int bt_mutextry(BtMutexLatch *latch)
-{
-uint prev;
-
-#ifdef unix
- prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
- prev = _InterlockedOr(latch->exclusive, XCL);
-#endif
- // take write access if exclusive bit is clear
-
- return !(prev & XCL);
-}
-
-// clear write mode
-
-void bt_releasemutex(BtMutexLatch *latch)
-{
- *latch->exclusive = 0;
-#ifdef unix
- sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
-}
-
// recovery manager -- flush dirty pages
void bt_flushlsn (BtMgr *mgr, ushort thread_no)
// flush dirty pool pages to the btree
-fprintf(stderr, "Start flushlsn\n");
+fprintf(stderr, "Start flushlsn ");
for( entry = 1; entry < mgr->latchtotal; entry++ ) {
page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
latch = mgr->latchsets + entry;
bt_lockpage(BtLockRead, latch, thread_no);
if( latch->dirty ) {
- bt_writepage(mgr, page, latch->page_no);
+ bt_writepage(mgr, page, latch->page_no, 0);
latch->dirty = 0, cnt++;
}
-if( latch->avail )
-cnt3++;
if( latch->pin & ~CLOCK_bit )
cnt2++;
bt_unlockpage(BtLockRead, latch);
bt_releasemutex (latch->modify);
}
-fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3);
+fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
+fprintf(stderr, "begin sync");
+ sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
+fprintf(stderr, " end sync\n");
}
// recovery manager -- process current recovery buff on startup
}
}
-// recovery manager -- dump current recovery buff & flush dirty pages
-// in preparation for next recovery buffer.
-
-BTERR bt_dumpredo (BtMgr *mgr)
-{
-BtLogHdr *eof;
-fprintf(stderr, "Flush pages ");
-
- eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
- memset (eof, 0, sizeof(BtLogHdr));
-
- // flush pages written at beginning of this redo buffer
- // then write the redo buffer out to disk
-
- fdatasync (mgr->idx);
-
-fprintf(stderr, "Dump ReDo: %d bytes\n", mgr->redoend);
- pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
-
- sync_file_range (mgr->idx, REDO_page << mgr->page_bits, mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
-
- mgr->flushlsn = mgr->lsn;
- mgr->redoend = 0;
-
- eof = (BtLogHdr *)(mgr->redobuff);
- memset (eof, 0, sizeof(BtLogHdr));
- eof->lsn = mgr->lsn;
- return 0;
-}
-
// recovery manager -- append new entry to recovery log
// flush to disk when it overflows.
uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
uint amt = sizeof(BtLogHdr);
BtLogHdr *hdr, *eof;
-uint flush;
+uint last, end;
bt_mutexlock (mgr->redo);
// see if new entry fits in buffer
// flush and reset if it doesn't
- if( flush = amt > size - mgr->redoend ) {
- bt_mutexlock (mgr->dump);
-
- if( bt_dumpredo (mgr) )
- return 0;
+ if( amt > size - mgr->redoend ) {
+ mgr->flushlsn = mgr->lsn;
+ msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+ mgr->redolast = 0;
+ mgr->redoend = 0;
+ bt_flushlsn(mgr, thread_no);
}
// fill in new entry & either eof or end block
memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
}
- bt_releasemutex(mgr->redo);
+ eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
+ memset (eof, 0, sizeof(BtLogHdr));
+ eof->lsn = mgr->lsn;
- if( flush ) {
- bt_flushlsn (mgr, thread_no);
- bt_releasemutex(mgr->dump);
- }
+ last = mgr->redolast & 0xfff;
+ end = mgr->redoend;
+ mgr->redolast = end;
+ bt_releasemutex(mgr->redo);
+
+ msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
return hdr->lsn;
}
uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
uint amt = 0, src, type;
BtLogHdr *hdr, *eof;
+uint last, end;
logseqno lsn;
-uint flush;
BtKey *key;
BtVal *val;
// see if new entry fits in buffer
// flush and reset if it doesn't
- if( flush = amt > size - mgr->redoend ) {
- bt_mutexlock (mgr->dump);
-
- if( bt_dumpredo (mgr) )
- return 0;
+ if( amt > size - mgr->redoend ) {
+ mgr->flushlsn = mgr->lsn;
+ msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+ mgr->redolast = 0;
+ mgr->redoend = 0;
+ bt_flushlsn (mgr, thread_no);
}
// assign new lsn to transaction
case Delete:
type = BTRM_del;
break;
- case Update:
- type = BTRM_upd;
- break;
}
amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
memset (eof, 0, sizeof(BtLogHdr));
+ eof->lsn = lsn;
+ last = mgr->redolast & 0xfff;
+ end = mgr->redoend;
+ mgr->redolast = end;
bt_releasemutex(mgr->redo);
- if( flush ) {
- bt_flushlsn (mgr, thread_no);
- bt_releasemutex(mgr->dump);
- }
-
+ msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+ bt_releasemutex(mgr->redo);
return lsn;
}
{
off64_t off = page_no << mgr->page_bits;
-#ifdef unix
if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
- return mgr->err = BTERR_read;
- }
-#else
-OVERLAPPED ovl[1];
-uint amt[1];
-
- memset (ovl, 0, sizeof(OVERLAPPED));
- ovl->Offset = off;
- ovl->OffsetHigh = off >> 32;
-
- if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
- fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
return BTERR_read;
}
- if( *amt < mgr->page_size ) {
- fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
- return BTERR_read;
- }
-#endif
+if( page->page_no != page_no )
+abort();
mgr->reads++;
return 0;
+/*
+int flag = PROT_READ | PROT_WRITE;
+uint segment = page_no >> 32;
+unsigned char *perm;
+
+ while( 1 ) {
+ if( segment < mgr->segments ) {
+ perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+ memcpy (page, perm, mgr->page_size);
+if( page->page_no != page_no )
+abort();
+ mgr->reads++;
+ return 0;
+ }
+
+ bt_mutexlock (mgr->maps);
+
+ if( segment < mgr->segments ) {
+ bt_releasemutex (mgr->maps);
+ continue;
+ }
+
+ mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
+ mgr->segments++;
+
+ bt_releasemutex (mgr->maps);
+ } */
}
// write page to permanent location in Btree file
// clear the dirty bit
-BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
+BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
{
off64_t off = page_no << mgr->page_bits;
- page->page_no = page_no;
-
-#ifdef unix
- if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
+ if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) {
+ fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno);
return BTERR_wrt;
-#else
-OVERLAPPED ovl[1];
-uint amt[1];
+ }
+ mgr->writes++;
+ return 0;
+/*
+int flag = PROT_READ | PROT_WRITE;
+uint segment = page_no >> 32;
+unsigned char *perm;
- memset (ovl, 0, sizeof(OVERLAPPED));
- ovl->Offset = off;
- ovl->OffsetHigh = off >> 32;
+ while( 1 ) {
+ if( segment < mgr->segments ) {
+ perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+ memcpy (perm, page, mgr->page_size);
+ if( syncit )
+ msync (perm, mgr->page_size, MS_SYNC);
+ mgr->writes++;
+ return 0;
+ }
- if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
- return BTERR_wrt;
+ bt_mutexlock (mgr->maps);
- if( *amt < mgr->page_size )
- return BTERR_wrt;
-#endif
- mgr->writes++;
- return 0;
+ if( segment < mgr->segments ) {
+ bt_releasemutex (mgr->maps);
+ continue;
+ }
+
+ mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
+ bt_releasemutex (mgr->maps);
+ mgr->segments++;
+ } */
}
// set CLOCK bit in latch
latch->pin |= CLOCK_bit;
latch->pin--;
- // if not doing redo recovery
- // set latch available
-
- if( mgr->redopages )
- if( !(latch->pin & ~CLOCK_bit) ) {
- latch->avail = 1;
-#ifdef unix
- __sync_fetch_and_add (&mgr->available, 1);
-#else
- _InterlockedIncrement(&mgr->available);
-#endif
- }
-
bt_releasemutex(latch->modify);
}
// return the btree cached page address
-// if page is dirty and has not yet been
-// flushed to disk for the current redo
-// recovery buffer, write it out.
BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
{
// return next available latch entry
// and with latch entry locked
-uint bt_availnext (BtMgr *mgr, ushort thread_id)
+uint bt_availnext (BtMgr *mgr)
{
BtLatchSet *latch;
uint entry;
while( 1 ) {
-
- // flush dirty pages if none are available
- // and we aren't doing redo recovery
-
- if( !mgr->redopages )
- if( !mgr->available )
- bt_flushlsn (mgr, thread_id);
-
#ifdef unix
- entry = __sync_fetch_and_add (&mgr->latchavail, 1) + 1;
+ entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
#else
- entry = _InterlockedIncrement (&mgr->latchavail);
+ entry = _InterlockedIncrement (&mgr->latchvictim);
#endif
entry %= mgr->latchtotal;
latch = mgr->latchsets + entry;
- if( !latch->avail )
+ if( !bt_mutextry(latch->modify) )
continue;
- bt_mutexlock(latch->modify);
+ // return this entry if it is not pinned
- if( !latch->avail ) {
- bt_releasemutex(latch->modify);
- continue;
- }
+ if( !latch->pin )
+ return entry;
+
+ // if the CLOCK bit is set
+ // reset it to zero.
- return entry;
+ latch->pin &= ~CLOCK_bit;
+ bt_releasemutex(latch->modify);
}
}
-// find available latchset
+// pin page in buffer pool
// return with latchset pinned
-BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage loadit, ushort thread_id)
+BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, BtPage contents, ushort thread_id)
{
uint hashidx = page_no % mgr->latchhash;
BtLatchSet *latch;
} while( entry = latch->next );
// found our entry: increment pin
- // remove from available status
if( entry ) {
latch = mgr->latchsets + entry;
bt_mutexlock(latch->modify);
- if( latch->avail )
-#ifdef unix
- __sync_fetch_and_add (&mgr->available, -1);
-#else
- _InterlockedDecrement(&mgr->available);
-#endif
- latch->avail = 0;
latch->pin |= CLOCK_bit;
latch->pin++;
-
+if(contents)
+abort();
bt_releasemutex(latch->modify);
bt_releasemutex(mgr->hashtable[hashidx].latch);
return latch;
}
- // find and reuse entry from available set
+ // find and reuse unpinned entry
trynext:
- if( entry = bt_availnext (mgr, thread_id) )
- latch = mgr->latchsets + entry;
- else
- return mgr->line = __LINE__, mgr->err = BTERR_avail, NULL;
+ entry = bt_availnext (mgr);
+ latch = mgr->latchsets + entry;
idx = latch->page_no % mgr->latchhash;
bt_releasemutex (mgr->hashtable[idx].latch);
}
- // remove available status
-
- latch->avail = 0;
-#ifdef unix
- __sync_fetch_and_add (&mgr->available, -1);
-#else
- _InterlockedDecrement(&mgr->available);
-#endif
page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
+ // update permanent page area in btree from buffer pool
+ // no read-lock is required since page is not pinned.
+
if( latch->dirty )
- if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
+ if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
return mgr->line = __LINE__, NULL;
else
latch->dirty = 0;
- if( loadit ) {
- memcpy (page, loadit, mgr->page_size);
+ if( contents ) {
+ memcpy (page, contents, mgr->page_size);
latch->dirty = 1;
- } else
- if( bt_readpage (mgr, page, page_no) )
+ } else if( bt_readpage (mgr, page, page_no) )
return mgr->line = __LINE__, NULL;
// link page as head of hash table chain
latch = mgr->latchsets + slot;
if( latch->dirty ) {
- bt_writepage(mgr, page, latch->page_no);
+ bt_writepage(mgr, page, latch->page_no, 0);
latch->dirty = 0, num++;
}
}
fprintf(stderr, "%d buffer pool pages flushed\n", num);
#ifdef unix
+ while( mgr->segments )
+ munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
+
munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
munmap (mgr->pagezero, mgr->page_size);
#else
// and page(s) of latches and page pool cache
memset (pagezero, 0, 1 << bits);
+ pagezero->alloc->lvl = MIN_lvl - 1;
pagezero->alloc->bits = mgr->page_bits;
bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
+ pagezero->activepages = 2;
// initialize left-most LEAF page in
- // alloc->left.
+ // alloc->left and count of active leaf pages.
bt_putid (pagezero->alloc->left, LEAF_page);
- if( bt_writepage (mgr, pagezero->alloc, 0) ) {
+ ftruncate (mgr->idx, REDO_page << mgr->page_bits);
+
+ if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
fprintf (stderr, "Unable to create btree page zero\n");
return bt_mgrclose (mgr), NULL;
}
pagezero->alloc->bits = mgr->page_bits;
for( lvl=MIN_lvl; lvl--; ) {
- slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
+ BtSlot *node = slotptr(pagezero->alloc, 1);
+ node->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
key = keyptr(pagezero->alloc, 1);
key->len = 2; // create stopper key
key->key[0] = 0xff;
val->len = lvl ? BtId : 0;
memcpy (val->value, value, val->len);
- pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
+ pagezero->alloc->min = node->off;
pagezero->alloc->lvl = lvl;
pagezero->alloc->cnt = 1;
pagezero->alloc->act = 1;
+ pagezero->alloc->page_no = MIN_lvl - lvl;
- if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
- fprintf (stderr, "Unable to create btree page zero\n");
+ if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
+ fprintf (stderr, "Unable to create btree page\n");
return bt_mgrclose (mgr), NULL;
}
}
fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
- if( mgr->redopages = redopages )
+ if( mgr->redopages = redopages ) {
+ ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
mgr->redobuff = valloc (redopages * mgr->page_size);
+ }
#else
flag = PAGE_READWRITE;
mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
- // mark all pool entries as available
-
- for( idx = 1; idx < mgr->latchtotal; idx++ ) {
- latch = mgr->latchsets + idx;
- latch->avail = 1;
- mgr->available++;
- }
-
return mgr;
}
// use empty chain first
// else allocate empty page
- if( page_no = bt_getid(mgr->pagezero->chain) ) {
+ if( page_no = bt_getid(mgr->pagezero->freechain) ) {
if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
set->page = bt_mappage (mgr, set->latch);
else
- return mgr->err = BTERR_struct, mgr->line = __LINE__, -1;
+ return mgr->line = __LINE__, mgr->err = BTERR_struct;
+
+ bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
+ mgr->pagezero->activepages++;
- bt_putid(mgr->pagezero->chain, bt_getid(set->page->right));
bt_releasemutex(mgr->lock);
memcpy (set->page, contents, mgr->page_size);
// unlock allocation latch and
// extend file into new page.
+ mgr->pagezero->activepages++;
+
+ ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits);
bt_releasemutex(mgr->lock);
- // don't load cache from btree page
+ // don't load cache from btree page, load it from contents
+
+ contents->page_no = page_no;
if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
set->page = bt_mappage (mgr, set->latch);
else
return mgr->err;
- set->latch->dirty = 1;
return 0;
}
// store chain
- memcpy(set->page->right, mgr->pagezero->chain, BtId);
- bt_putid(mgr->pagezero->chain, set->latch->page_no);
+ memcpy(set->page->right, mgr->pagezero->freechain, BtId);
+ bt_putid(mgr->pagezero->freechain, set->latch->page_no);
set->latch->dirty = 1;
set->page->free = 1;
+ // decrement active page count
+ // and unlock allocation page
+
+ mgr->pagezero->activepages--;
+ bt_releasemutex (mgr->lock);
+
// unlock released page
bt_unlockpage (BtLockDelete, set->latch);
bt_unlockpage (BtLockWrite, set->latch);
bt_unpinlatch (mgr, set->latch);
-
- // unlock allocation page
-
- bt_releasemutex (mgr->lock);
}
// a fence key was deleted from a page
bt_putid (value, set->latch->page_no);
ptr = (BtKey*)leftkey;
- if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+ if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
return mgr->err;
// now delete old fence key
// delete a page and manage keys
// call with page writelocked
-// returns with page unpinned
BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
{
BtKey *ptr;
// cache copy of fence key
- // to post in parent
+ // to remove in parent
ptr = keyptr(set->page, set->page->cnt);
memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
bt_putid (value, set->latch->page_no);
ptr = (BtKey*)higherfence;
- if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+ if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
return mgr->err;
// delete old lower key to our node
bt_freepage (mgr, right);
bt_unlockpage (BtLockParent, set->latch);
- bt_unpinlatch (mgr, set->latch);
return 0;
}
{
uint slot, idx, found, fence;
BtPageSet set[1];
+BtSlot *node;
BtKey *ptr;
BtVal *val;
- if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) )
+ if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
+ node = slotptr(set->page, slot);
ptr = keyptr(set->page, slot);
- else
+ } else
return mgr->err;
// if librarian slot, advance to real slot
- if( slotptr(set->page, slot)->type == Librarian )
+ if( node->type == Librarian ) {
ptr = keyptr(set->page, ++slot);
+ node = slotptr(set->page, slot);
+ }
fence = slot == set->page->cnt;
- // if key is found delete it, otherwise ignore request
+ // delete the key, ignore request if already dead
if( found = !keycmp (ptr, key, len) )
- if( found = slotptr(set->page, slot)->dead == 0 ) {
+ if( found = node->dead == 0 ) {
val = valptr(set->page,slot);
- slotptr(set->page, slot)->dead = 1;
set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
set->page->act--;
+ // mark node type as delete
+
+ node->type = Delete;
+ node->dead = 1;
+
// collapse empty slots beneath the fence
+ // on interiour nodes
- while( idx = set->page->cnt - 1 )
+ if( lvl )
+ while( idx = set->page->cnt - 1 )
if( slotptr(set->page, idx)->dead ) {
*slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
// delete empty page
- if( !set->page->act )
- return bt_deletepage (mgr, set, thread_no);
+ if( !set->page->act ) {
+ if( bt_deletepage (mgr, set, thread_no) )
+ return mgr->err;
+ } else {
+ set->latch->dirty = 1;
+ bt_unlockpage(BtLockWrite, set->latch);
+ }
- set->latch->dirty = 1;
- bt_unlockpage(BtLockWrite, set->latch);
bt_unpinlatch (mgr, set->latch);
return 0;
}
root->page->act = 2;
root->page->lvl++;
+ mgr->pagezero->alloc->lvl = root->page->lvl;
+
// release and unpin root pages
bt_unlockpage(BtLockWrite, root->latch);
bt_putid (value, set->latch->page_no);
ptr = (BtKey *)leftkey;
- if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+ if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
return mgr->err;
// switch fence for right block of larger keys to new right page
bt_putid (value, right->page_no);
ptr = (BtKey *)rightkey;
- if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, 1, thread_no) )
+ if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
return mgr->err;
bt_unlockpage (BtLockParent, set->latch);
// Insert new key into the btree at given level.
// either add a new key or update/add an existing one
-BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique, ushort thread_no)
+BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
{
-unsigned char newkey[BT_keyarray];
uint slot, idx, len, entry;
BtPageSet set[1];
-BtKey *ptr, *ins;
-uid sequence;
+BtSlot *node;
+BtKey *ptr;
BtVal *val;
-uint type;
-
- // set up the key we're working on
-
- ins = (BtKey*)newkey;
- memcpy (ins->key, key, keylen);
- ins->len = keylen;
-
- // is this a non-unique index value?
-
- if( unique )
- type = Unique;
- else {
- type = Duplicate;
- sequence = bt_newdup (mgr);
- bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
- ins->len += BtId;
- }
while( 1 ) { // find the page and slot for the current key
- if( slot = bt_loadpage (mgr, set, ins->key, ins->len, lvl, BtLockWrite, thread_no) )
+ if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
+ node = slotptr(set->page, slot);
ptr = keyptr(set->page, slot);
- else {
+ } else {
if( !mgr->err )
mgr->line = __LINE__, mgr->err = BTERR_ovflw;
return mgr->err;
// if librarian slot == found slot, advance to real slot
- if( slotptr(set->page, slot)->type == Librarian )
- if( !keycmp (ptr, key, keylen) )
+ if( node->type == Librarian )
+ if( !keycmp (ptr, key, keylen) ) {
ptr = keyptr(set->page, ++slot);
+ node = slotptr(set->page, slot);
+ }
- len = ptr->len;
-
- if( slotptr(set->page, slot)->type == Duplicate )
- len -= BtId;
-
- // if inserting a duplicate key or unique key
+ // if inserting a duplicate key or unique
+ // key that doesn't exist on the page,
// check for adequate space on the page
// and insert the new key before slot.
- if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
- if( !(slot = bt_cleanpage (mgr, set, ins->len, slot, vallen)) )
- if( !(entry = bt_splitpage (mgr, set, thread_no)) )
- return mgr->err;
- else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
- return mgr->err;
- else
- continue;
-
- return bt_insertslot (mgr, set, slot, ins->key, ins->len, value, vallen, type, 1);
- }
+ switch( type ) {
+ case Unique:
+ case Duplicate:
+ if( keycmp (ptr, key, keylen) )
+ if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
+ return bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type, 1);
+ else if( !(entry = bt_splitpage (mgr, set, thread_no)) )
+ return mgr->err;
+ else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+ return mgr->err;
+ else
+ continue;
- // if key already exists, update value and return
+ // if key already exists, update value and return
- val = valptr(set->page, slot);
+ val = valptr(set->page, slot);
- if( val->len >= vallen ) {
+ if( val->len >= vallen ) {
if( slotptr(set->page, slot)->dead )
set->page->act++;
+ node->type = type;
+ node->dead = 0;
+
set->page->garbage += val->len - vallen;
set->latch->dirty = 1;
- slotptr(set->page, slot)->dead = 0;
val->len = vallen;
memcpy (val->value, value, vallen);
bt_unlockpage(BtLockWrite, set->latch);
bt_unpinlatch (mgr, set->latch);
return 0;
- }
+ }
- // new update value doesn't fit in existing value area
+ // new update value doesn't fit in existing value area
+ // make sure page has room
- if( !slotptr(set->page, slot)->dead )
+ if( !node->dead )
set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
- else {
- slotptr(set->page, slot)->dead = 0;
+ else
set->page->act++;
- }
- if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
- if( !(entry = bt_splitpage (mgr, set, thread_no)) )
+ node->type = type;
+ node->dead = 0;
+
+ if( !(slot = bt_cleanpage (mgr, set, keylen, slot, vallen)) )
+ if( !(entry = bt_splitpage (mgr, set, thread_no)) )
return mgr->err;
- else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
+ else if( bt_splitkeys (mgr, set, mgr->latchsets + entry, thread_no) )
return mgr->err;
- else
+ else
continue;
- set->page->min -= vallen + sizeof(BtVal);
- val = (BtVal*)((unsigned char *)set->page + set->page->min);
- memcpy (val->value, value, vallen);
- val->len = vallen;
+ // copy key and value onto page and update slot
- set->latch->dirty = 1;
- set->page->min -= keylen + sizeof(BtKey);
- ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
- memcpy (ptr->key, key, keylen);
- ptr->len = keylen;
+ set->page->min -= vallen + sizeof(BtVal);
+ val = (BtVal*)((unsigned char *)set->page + set->page->min);
+ memcpy (val->value, value, vallen);
+ val->len = vallen;
+
+ set->latch->dirty = 1;
+ set->page->min -= keylen + sizeof(BtKey);
+ ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
+ memcpy (ptr->key, key, keylen);
+ ptr->len = keylen;
- slotptr(set->page, slot)->off = set->page->min;
- bt_unlockpage(BtLockWrite, set->latch);
- bt_unpinlatch (mgr, set->latch);
- return 0;
+ node->off = set->page->min;
+ bt_unlockpage(BtLockWrite, set->latch);
+ bt_unpinlatch (mgr, set->latch);
+ return 0;
+ }
}
return 0;
}
return mgr->line = __LINE__, mgr->err = BTERR_atomic;
}
+// perform delete from smaller btree
+// insert a delete slot if not found there
+
BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
{
BtKey *key = keyptr(source, src);
BtPageSet set[1];
uint idx, slot;
+BtSlot *node;
BtKey *ptr;
BtVal *val;
- if( slot = bt_atomicpage (mgr, source, locks, src, set) )
+ if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
+ node = slotptr(set->page, slot);
ptr = keyptr(set->page, slot);
- else
+ val = valptr(set->page, slot);
+ } else
return mgr->line = __LINE__, mgr->err = BTERR_struct;
- if( !keycmp (ptr, key->key, key->len) )
- if( !slotptr(set->page, slot)->dead )
- slotptr(set->page, slot)->dead = 1;
- else
- return 0;
- else
+ // if slot is not found, insert a delete slot
+
+ if( keycmp (ptr, key->key, key->len) )
+ return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete, 0);
+
+ // if node is already dead,
+ // ignore the request.
+
+ if( node->dead )
return 0;
- val = valptr(set->page, slot);
set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
set->latch->dirty = 1;
set->page->lsn = locks[src].lsn;
set->page->act--;
+
+ node->dead = 0;
mgr->found++;
return 0;
}
ptr = keyptr(right->page,right->page->cnt);
bt_putid (value, prev->latch->page_no);
- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, 1, thread_no) )
+ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
return mgr->err;
// now that master page is in good shape we can
if( right_page_no = bt_getid (prev->page->right) ) {
if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
temp->page = bt_mappage (mgr, temp->latch);
+ else
+ return mgr->err;
bt_lockpage (BtLockWrite, temp->latch, thread_no);
bt_putid (temp->page->left, prev->latch->page_no);
return 0;
}
-// find and add the next available latch entry
-// to the queue
-
-BTERR bt_txnavaillatch (BtDb *bt)
-{
-BtLatchSet *latch;
-uint startattempt;
-uint cnt, entry;
-uint hashidx;
-BtPage page;
-
- // find and reuse previous entry on victim
-
- startattempt = bt->mgr->latchvictim;
-
- while( 1 ) {
-#ifdef unix
- entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
-#else
- entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
-#endif
- // skip entry if it has outstanding pins
-
- entry %= bt->mgr->latchtotal;
-
- if( !entry )
- continue;
-
- // only go around one time before
- // flushing redo recovery buffer,
- // and the buffer pool to free up entries.
-
- if( bt->mgr->redopages )
- if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
- if( bt_mutextry (bt->mgr->dump) ) {
- if( bt_dumpredo (bt->mgr) )
- return bt->mgr->err;
- bt_flushlsn (bt->mgr, bt->thread_no);
- // synchronize the various threads running into this condition
- // so that only one thread does the dump and flush
- } else
- bt_mutexlock(bt->mgr->dump);
-
- startattempt = bt->mgr->latchvictim;
- bt_releasemutex(bt->mgr->dump);
- }
-
- latch = bt->mgr->latchsets + entry;
-
- if( latch->avail )
- continue;
-
- bt_mutexlock(latch->modify);
-
- // skip if already an available entry
-
- if( latch->avail ) {
- bt_releasemutex(latch->modify);
- continue;
- }
-
- // skip this entry if it is pinned
- // if the CLOCK bit is set
- // reset it to zero.
-
- if( latch->pin ) {
- latch->pin &= ~CLOCK_bit;
- bt_releasemutex(latch->modify);
- continue;
- }
-
- page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
-
- // if dirty page has lsn >= last redo recovery buffer
- // then hold page in the buffer pool until next redo
- // recovery buffer is being written to disk.
-
- if( latch->dirty )
- if( page->lsn >= bt->mgr->flushlsn ) {
- bt_releasemutex(latch->modify);
- continue;
- }
-
- // entry is now available
-#ifdef unix
- __sync_fetch_and_add (&bt->mgr->available, 1);
-#else
- _InterlockedIncrement(&bt->mgr->available);
-#endif
- latch->avail = 1;
- bt_releasemutex(latch->modify);
- return 0;
- }
-}
-
-// release available latch requests
-
-void bt_txnavailrelease (BtDb *bt, uint count)
-{
-#ifdef unix
- __sync_fetch_and_add(bt->mgr->availlock, -count);
-#else
- _InterlockedAdd(bt->mgr->availlock, -count);
-#endif
-}
-
-// promote page of keys from first btree
-// into main btree
-
-BTERR bt_txnavailmain (BtDb *bt)
-{
-BtLatchSet *latch;
-uint entry;
-
- while( 1 ) {
-#ifdef unix
- entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
-#else
- entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
-#endif
- // skip entry if it has outstanding pins
-
- entry %= bt->mgr->latchtotal;
-
- if( !entry )
- continue;
-
- latch = bt->mgr->latchsets + entry;
-
- if( latch->avail )
- continue;
-
- bt_mutexlock(latch->modify);
-
- // skip if already an available entry
-
- if( latch->avail ) {
- bt_releasemutex(latch->modify);
- continue;
- }
-
- // skip this entry if it is pinned
- // if the CLOCK bit is set
- // reset it to zero.
-
- if( latch->pin ) {
- latch->pin &= ~CLOCK_bit;
- bt_releasemutex(latch->modify);
- continue;
- }
-
- }
-}
-
-// commit available pool entries
-// find available entries as required
-
-BTERR bt_txnavailrequest (BtDb *bt, uint count)
-{
-#ifdef unix
- __sync_fetch_and_add(bt->mgr->availlock, count);
-#else
- _InterlockedAdd(bt->mgr->availlock, count);
-#endif
-
- // find another available pool entry
-
- while( *bt->mgr->availlock > bt->mgr->available )
- if( bt->mgr->redopages )
- bt_txnavaillatch (bt);
- else
- if( bt_txnavailmain (bt) )
- return bt->mgr->err;
-}
-
// atomic modification of a batch of keys.
// return -1 if BTERR is set
// causing the key constraint violation
// or zero on successful completion.
-int bt_atomictxn (BtDb *bt, BtPage source)
+BTERR bt_atomictxn (BtDb *bt, BtPage source)
{
-uint src, idx, slot, samepage, entry, avail, que = 0;
+uint src, idx, slot, samepage, entry, que = 0;
AtomicKey *head, *tail, *leaf;
BtPageSet set[1], prev[1];
unsigned char value[BtId];
}
}
- // reserve enough buffer pool entries
-
- avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
+ // add entries to redo log
- if( bt_txnavailrequest (bt, avail) )
- return bt->mgr->err;
+ if( bt->mgr->redopages )
+ if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
+ for( src = 0; src++ < source->cnt; )
+ locks[src].lsn = lsn;
+ else
+ return bt->mgr->err;
// Load the leaf page for each key
// group same page references with reuse bit
if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
set->latch->split = 0;
else
- goto atomicerr;
+ return bt->mgr->err;
if( slotptr(set->page, slot)->type == Librarian )
ptr = keyptr(set->page, ++slot);
// capture current lsn for master page
locks[src].reqlsn = set->page->lsn;
-
- // perform constraint checks
-
- switch( slotptr(source, src)->type ) {
- case Duplicate:
- case Unique:
- if( !slotptr(set->page, slot)->dead )
- if( slot < set->page->cnt || bt_getid (set->page->right) )
- if( !keycmp (ptr, key->key, key->len) ) {
-
- // return constraint violation if key already exists
-
- bt_unlockpage(BtLockRead, set->latch);
- result = src;
-
- while( src ) {
- if( locks[src].entry ) {
- set->latch = bt->mgr->latchsets + locks[src].entry;
- bt_unlockpage(BtLockAtomic, set->latch);
- bt_unpinlatch (bt->mgr, set->latch);
- }
- src--;
- }
- free (locks);
- return result;
- }
- break;
- }
}
// unlock last loadpage lock
if( source->cnt )
bt_unlockpage(BtLockRead, set->latch);
- // and add entries to redo log
-
- if( bt->mgr->redopages )
- if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
- for( src = 0; src++ < source->cnt; )
- locks[src].lsn = lsn;
- else
- goto atomicerr;
-
// obtain write lock for each master page
+ // sync flushed pages to disk
for( src = 0; src++ < source->cnt; ) {
if( locks[src].reuse )
continue;
- else
- bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry, bt->thread_no);
+
+ set->latch = bt->mgr->latchsets + locks[src].entry;
+ bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
}
// insert or delete each key
switch( slotptr(source,idx)->type ) {
case Delete:
if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
- goto atomicerr;
+ return bt->mgr->err;
break;
case Duplicate:
case Unique:
if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
- goto atomicerr;
+ return bt->mgr->err;
break;
}
if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
set->page = bt_mappage (bt->mgr, set->latch);
else
- goto atomicerr;
+ return bt->mgr->err;
bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
bt_putid (set->page->left, prev->latch->page_no);
ptr = keyptr(prev->page,prev->page->cnt);
if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
- goto atomicerr;
+ return bt->mgr->err;
// perform the remainder of the delete
// from the FIFO queue
bt_unlockpage(BtLockWrite, prev->latch);
}
- bt_txnavailrelease (bt, avail);
-
- que *= bt->mgr->pagezero->alloc->lvl;
-
- if( bt_txnavailrequest (bt, que) )
- return bt->mgr->err;
-
// add & delete keys for any pages split or merged during transaction
if( leaf = head )
switch( leaf->type ) {
case 0: // insert key
- if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, 1, bt->thread_no) )
- goto atomicerr;
+ if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
+ return bt->mgr->err;
break;
case 1: // delete key
if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
- goto atomicerr;
+ return bt->mgr->err;
break;
case 2: // free page
if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
- goto atomicerr;
+ return bt->mgr->err;
break;
}
free (leaf);
} while( leaf = tail );
- bt_txnavailrelease (bt, que);
+ // if number of active pages
+ // is greater than the buffer pool
+ // promote page into larger btree
+
+ while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+ if( bt_txnpromote (bt) )
+ return bt->mgr->err;
// return success
free (locks);
return 0;
-atomicerr:
- return -1;
+}
+
+// promote a page into the larger btree
+
+BTERR bt_txnpromote (BtDb *bt)
+{
+uint entry, slot, idx;
+BtPageSet set[1];
+BtSlot *node;
+BtKey *ptr;
+BtVal *val;
+
+ while( 1 ) {
+#ifdef unix
+ entry = __sync_fetch_and_add(&bt->mgr->latchpromote, 1);
+#else
+ entry = _InterlockedIncrement (&bt->mgr->latchpromote) - 1;
+#endif
+ entry %= bt->mgr->latchtotal;
+
+ if( !entry )
+ continue;
+
+ set->latch = bt->mgr->latchsets + entry;
+ idx = set->latch->page_no % bt->mgr->latchhash;
+
+ if( !bt_mutextry(set->latch->modify) )
+ continue;
+
+// if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
+// bt_releasemutex(set->latch->modify);
+// continue;
+// }
+
+ // skip this entry if it is pinned
+
+ if( set->latch->pin & ~CLOCK_bit ) {
+ bt_releasemutex(set->latch->modify);
+// bt_releasemutex(bt->mgr->hashtable[idx].latch);
+ continue;
+ }
+
+ set->page = bt_mappage (bt->mgr, set->latch);
+
+ // entry never used or has no right sibling
+
+ if( !set->latch->page_no || !bt_getid (set->page->right) ) {
+ bt_releasemutex(set->latch->modify);
+// bt_releasemutex(bt->mgr->hashtable[idx].latch);
+ continue;
+ }
+
+ bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
+ bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+ bt_unlockpage(BtLockAccess, set->latch);
+
+ // entry interiour node or being or was killed
+
+ if( set->page->lvl || set->page->free || set->page->kill ) {
+ bt_releasemutex(set->latch->modify);
+// bt_releasemutex(bt->mgr->hashtable[idx].latch);
+ bt_unlockpage(BtLockWrite, set->latch);
+ continue;
+ }
+
+ // pin the page for our useage
+
+ set->latch->pin++;
+ bt_releasemutex(set->latch->modify);
+// bt_releasemutex(bt->mgr->hashtable[idx].latch);
+
+ // if page is dirty, then
+ // sync it to the disk first.
+
+ if( set->latch->dirty )
+ if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
+ return bt->mgr->line = __LINE__, bt->mgr->err;
+ else
+ set->latch->dirty = 0;
+
+ // transfer slots in our selected page to larger btree
+if( !(set->latch->page_no % 100) )
+fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
+
+ for( slot = 0; slot++ < set->page->cnt; ) {
+ ptr = keyptr (set->page, slot);
+ val = valptr (set->page, slot);
+ node = slotptr(set->page, slot);
+
+ switch( node->type ) {
+ case Duplicate:
+ case Unique:
+ if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
+ return bt->main->err;
+
+ continue;
+
+ case Delete:
+ if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
+ return bt->main->err;
+
+ continue;
+ }
+ }
+
+ // now delete the page
+
+ if( bt_deletepage (bt->mgr, set, bt->thread_no) )
+ return bt->mgr->err;
+
+ bt_unpinlatch (bt->mgr, set->latch);
+ return 0;
+ }
}
// set cursor to highest slot on highest page
if( *latch->access->rin & MASK )
fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
- if( *latch->parent->exclusive )
- fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
+// if( *latch->parent->xcl->value )
+// fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
- if( *latch->atomic->exclusive )
- fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
+// if( *latch->atomic->xcl->value )
+// fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
- if( *latch->modify->exclusive )
- fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
+// if( *latch->modify->value )
+// fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
if( latch->pin & ~CLOCK_bit )
fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
line++;
if( !args->num ) {
- if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, 1, bt->thread_no) )
+ if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
len = 0;
continue;
{
line++;
- if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, 1, bt->thread_no) )
+ if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
len = 0;
}
cnt++;
}
- set->latch->avail = 1;
bt_unlockpage (BtLockRead, set->latch);
bt_unpinlatch (bt->mgr, set->latch);
} while( page_no = next );
posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
#endif
fprintf(stderr, "started counting\n");
- page_no = LEAF_page;
+ next = LEAF_page + bt->mgr->redopages + 1;
while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
if( bt_readpage (bt->mgr, bt->cursor, page_no) )
if( !bt->cursor->free && !bt->cursor->lvl )
cnt += bt->cursor->act;
- page_no++;
+ bt->mgr->reads++;
+ page_no = next++;
}
cnt--; // remove stopper key