// redo log for failure recovery
// and LSM B-trees for write optimization
-// 11 OCT 2014
+// 15 OCT 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
BtLockAtomic = 32
} BtLock;
-// lite weight spin latch
-
-volatile typedef struct {
- ushort exclusive:1;
- ushort pending:1;
- ushort share:14;
-} BtMutexLatch;
-
-#define XCL 1
-#define PEND 2
-#define BOTH 3
-#define SHARE 4
-
-/*
-// exclusive is set for write access
-
-volatile typedef struct {
- unsigned char exclusive[1];
- unsigned char filler;
-} BtMutexLatch;
-*/
-/*
typedef struct {
union {
struct {
- volatile uint xlock:1; // one writer has exclusive lock
- volatile uint wrt:31; // count of other writers waiting
+ volatile ushort xlock[1]; // one writer has exclusive lock
+ volatile ushort wrt[1]; // count of other writers waiting
} bits[1];
uint value[1];
};
} BtMutexLatch;
-*/
+
#define XCL 1
-#define WRT 2
+#define WRT 65536
// definition for phase-fair reader/writer lock implementation
volatile ushort rout[1];
volatile ushort ticket[1];
volatile ushort serving[1];
- volatile ushort tid[1];
- volatile ushort dup[1];
} RWLock;
-// write only lock
+// write only reentrant lock
typedef struct {
- BtMutexLatch xcl[1];
- volatile ushort tid[1];
- volatile ushort dup[1];
+ BtMutexLatch xcl[1];
+ union {
+ struct {
+ volatile ushort tid[1];
+ volatile ushort dup[1];
+ } bits[1];
+ uint value[1];
+ };
+ volatile uint waiters[1];
} WOLock;
#define PHID 0x1
WOLock parent[1]; // Posting of fence key in parent
WOLock atomic[1]; // Atomic update in progress
uint split; // right split page atomic insert
- uint entry; // entry slot in latch table
uint next; // next entry in hash table chain
uint prev; // prev entry in hash table chain
ushort pin; // number of accessing threads
- unsigned char dirty; // page in cache is dirty (atomic set)
+ unsigned char dirty; // page in cache is dirty (atomic setable)
+ unsigned char promote; // page in cache is dirty (atomic setable)
BtMutexLatch modify[1]; // modify entry lite latch
} BtLatchSet;
// note that this structure size
// must be a multiple of 8 bytes
-// in order to place dups correctly.
+// in order to place PageZero correctly.
typedef struct BtPage_ {
uint cnt; // count of keys in page
typedef struct {
struct BtPage_ alloc[1]; // next page_no in right ptr
- unsigned long long dups[1]; // global duplicate key uniqueifier
unsigned char freechain[BtId]; // head of free page_nos chain
- unsigned long long activepages; // number of active pages pages
+ unsigned long long activepages; // number of active pages
+ uint redopages; // number of redo pages in file
} BtPageZero;
// The object structure for Btree access
uint latchhash; // number of latch hash table slots
uint latchvictim; // next latch entry to examine
uint latchpromote; // next latch entry to promote
- uint redopages; // size of recovery buff in pages
uint redolast; // last msync size of recovery buff
uint redoend; // eof/end element in recovery buff
int err; // last error
unsigned char key[BT_keyarray]; // last found complete key
} BtDb;
+// atomic txn structures
+
+typedef struct {
+ logseqno reqlsn; // redo log seq no required
+ uint entry; // latch table entry number
+ uint slot:31; // page slot number
+ uint reuse:1; // reused previous page
+} AtomicTxn;
+
// Catastrophic errors
typedef enum {
extern void bt_close (BtDb *bt);
extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
-extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no);
extern void bt_unlockpage(BtLock mode, BtLatchSet *latch);
extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
-// transaction functions
+// atomic transaction functions
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
BTERR bt_txnpromote (BtDb *bt);
// The page is allocated from low and hi ends.
return id;
}
-uid bt_newdup (BtMgr *mgr)
-{
-#ifdef unix
- return __sync_fetch_and_add (mgr->pagezero->dups, 1) + 1;
-#else
- return _InterlockedIncrement64(mgr->pagezero->dups, 1);
-#endif
-}
-
// lite weight spin lock Latch Manager
int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
void bt_mutexlock(BtMutexLatch *latch)
{
-ushort prev;
-
- do {
-#ifdef unix
- prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
-#else
- prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
-#endif
- if( !(prev & XCL) )
- if( !(prev & ~BOTH) )
- return;
- else
-#ifdef unix
- __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
- _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
-#ifdef unix
- } while( sched_yield(), 1 );
-#else
- } while( SwitchToThread(), 1 );
-#endif
-/*
-unsigned char prev;
-
- do {
-#ifdef unix
- prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
- prev = _InterlockedOr8(latch->exclusive, XCL);
-#endif
- if( !(prev & XCL) )
- return;
-#ifdef unix
- } while( sched_yield(), 1 );
-#else
- } while( SwitchToThread(), 1 );
-#endif
-*/
-/*
BtMutexLatch prev[1];
uint slept = 0;
while( 1 ) {
*prev->value = __sync_fetch_and_or(latch->value, XCL);
- if( !prev->bits->xlock ) { // did we set XCL bit?
+ if( !*prev->bits->xlock ) { // did we set XCL?
if( slept )
__sync_fetch_and_sub(latch->value, WRT);
return;
}
if( !slept ) {
- prev->bits->wrt++;
+ *prev->bits->wrt += 1;
__sync_fetch_and_add(latch->value, WRT);
}
- sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
+ sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
slept = 1;
- } */
+ }
}
// try to obtain write lock
int bt_mutextry(BtMutexLatch *latch)
{
-ushort prev;
-
-#ifdef unix
- prev = __sync_fetch_and_or((ushort *)latch, XCL);
-#else
- prev = _InterlockedOr16((ushort *)latch, XCL);
-#endif
- // take write access if all bits are clear
-
- if( !(prev & XCL) )
- if( !(prev & ~BOTH) )
- return 1;
- else
-#ifdef unix
- __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
- _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
- return 0;
-/*
-unsigned char prev;
-
-#ifdef unix
- prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
- prev = _InterlockedOr8(latch->exclusive, XCL);
-#endif
- // take write access if all bits are clear
-
- return !(prev & XCL);
-*/
-/*
BtMutexLatch prev[1];
*prev->value = __sync_fetch_and_or(latch->value, XCL);
- // take write access if exclusive bit is clear
+ // take write access if exclusive bit was clear
- return !prev->bits->xlock;
-*/
+ return !*prev->bits->xlock;
}
// clear write mode
void bt_releasemutex(BtMutexLatch *latch)
{
-#ifdef unix
- __sync_fetch_and_and((ushort *)latch, ~BOTH);
-#else
- _InterlockedAnd16((ushort *)latch, ~BOTH);
-#endif
-/*
- *latch->exclusive = 0;
-*/
-/*
BtMutexLatch prev[1];
*prev->value = __sync_fetch_and_and(latch->value, ~XCL);
- if( prev->bits->wrt )
- sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-*/
+ if( *prev->bits->wrt )
+ sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
}
-// Write-Only Queue Lock
+// Write-Only Reentrant Lock
void WriteOLock (WOLock *lock, ushort tid)
{
- if( *lock->tid == tid ) {
- *lock->dup += 1;
+uint prev, waited = 0;
+
+ while( 1 ) {
+ bt_mutexlock(lock->xcl);
+
+ if( waited )
+ *lock->waiters -= 1;
+
+ if( *lock->bits->tid == tid ) {
+ *lock->bits->dup += 1;
+ bt_releasemutex(lock->xcl);
+ return;
+ }
+ if( !*lock->bits->tid ) {
+ *lock->bits->tid = tid;
+ bt_releasemutex(lock->xcl);
return;
}
- bt_mutexlock(lock->xcl);
- *lock->tid = tid;
+ waited = 1;
+ *lock->waiters += 1;
+ prev = *lock->value;
+
+ bt_releasemutex(lock->xcl);
+
+ sys_futex( lock->value, FUTEX_WAIT_BITSET_PRIVATE, prev, NULL, NULL, QueWr );
+ }
}
void WriteORelease (WOLock *lock)
{
- if( *lock->dup ) {
- *lock->dup -= 1;
- return;
+ bt_mutexlock(lock->xcl);
+
+ if( *lock->bits->dup ) {
+ *lock->bits->dup -= 1;
+ bt_releasemutex(lock->xcl);
+ return;
+ }
+
+ *lock->bits->tid = 0;
+
+ if( *lock->waiters )
+ sys_futex( lock->value, FUTEX_WAKE_BITSET_PRIVATE, 32768, NULL, NULL, QueWr );
+ bt_releasemutex(lock->xcl);
+}
+
+// clear lock of holders and waiters
+
+ClearWOLock (WOLock *lock)
+{
+ while( 1 ) {
+ bt_mutexlock(lock->xcl);
+
+ if( *lock->waiters ) {
+ bt_releasemutex(lock->xcl);
+ sched_yield();
+ continue;
+ }
+
+ if( *lock->bits->tid ) {
+ bt_releasemutex(lock->xcl);
+ sched_yield();
+ continue;
}
- *lock->tid = 0;
bt_releasemutex(lock->xcl);
+ return;
+ }
}
// Phase-Fair reader/writer lock implementation
{
ushort w, r, tix;
- if( *lock->tid == tid ) {
- *lock->dup += 1;
- return;
- }
#ifdef unix
tix = __sync_fetch_and_add (lock->ticket, 1);
#else
#else
SwitchToThread();
#endif
- *lock->tid = tid;
}
void WriteRelease (RWLock *lock)
{
- if( *lock->dup ) {
- *lock->dup -= 1;
- return;
- }
-
- *lock->tid = 0;
#ifdef unix
__sync_fetch_and_and (lock->rin, ~MASK);
#else
{
ushort w;
- // OK if write lock already held by same thread
-
- if( *lock->tid == tid ) {
- *lock->dup += 1;
- return 1;
- }
#ifdef unix
w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
#else
{
ushort w;
- if( *lock->tid == tid ) {
- *lock->dup += 1;
- return;
- }
#ifdef unix
w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
#else
void ReadRelease (RWLock *lock)
{
- if( *lock->dup ) {
- *lock->dup -= 1;
- return;
- }
-
#ifdef unix
__sync_fetch_and_add (lock->rout, RINC);
#else
void bt_flushlsn (BtMgr *mgr, ushort thread_no)
{
uint cnt3 = 0, cnt2 = 0, cnt = 0;
+uint entry, segment;
BtLatchSet *latch;
BtPage page;
-uint entry;
// flush dirty pool pages to the btree
bt_lockpage(BtLockRead, latch, thread_no);
if( latch->dirty ) {
- bt_writepage(mgr, page, latch->page_no, 0);
+ bt_writepage(mgr, page, latch->page_no);
latch->dirty = 0, cnt++;
}
if( latch->pin & ~CLOCK_bit )
}
fprintf(stderr, "End flushlsn %d pages %d pinned\n", cnt, cnt2);
fprintf(stderr, "begin sync");
- sync_file_range (mgr->idx, 0, 0, SYNC_FILE_RANGE_WAIT_AFTER);
+ for( segment = 0; segment < mgr->segments; segment++ )
+ if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
fprintf(stderr, " end sync\n");
}
BtKey *key;
BtVal *val;
- pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
-
hdr = (BtLogHdr *)mgr->redobuff;
mgr->flushlsn = hdr->lsn;
}
// recovery manager -- append new entry to recovery log
-// flush to disk when it overflows.
+// flush dirty pages to disk when it overflows.
logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
{
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
uint amt = sizeof(BtLogHdr);
BtLogHdr *hdr, *eof;
uint last, end;
if( amt > size - mgr->redoend ) {
mgr->flushlsn = mgr->lsn;
- msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+ if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
mgr->redolast = 0;
mgr->redoend = 0;
bt_flushlsn(mgr, thread_no);
memset (eof, 0, sizeof(BtLogHdr));
eof->lsn = mgr->lsn;
- last = mgr->redolast & 0xfff;
+ last = mgr->redolast & ~0xfff;
end = mgr->redoend;
- mgr->redolast = end;
- bt_releasemutex(mgr->redo);
+ if( end - last + sizeof(BtLogHdr) >= 32768 )
+ if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+ else
+ mgr->redolast = end;
- msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+ bt_releasemutex(mgr->redo);
return hdr->lsn;
}
// recovery manager -- append transaction to recovery log
-// flush to disk when it overflows.
+// flush dirty pages to disk when it overflows.
logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
{
-uint size = mgr->page_size * mgr->redopages - sizeof(BtLogHdr);
+uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
uint amt = 0, src, type;
BtLogHdr *hdr, *eof;
uint last, end;
if( amt > size - mgr->redoend ) {
mgr->flushlsn = mgr->lsn;
- msync (mgr->redobuff + (mgr->redolast & 0xfff), mgr->redoend - mgr->redolast + sizeof(BtLogHdr) + 4096, MS_SYNC);
+ if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
mgr->redolast = 0;
mgr->redoend = 0;
bt_flushlsn (mgr, thread_no);
memset (eof, 0, sizeof(BtLogHdr));
eof->lsn = lsn;
- last = mgr->redolast & 0xfff;
+ last = mgr->redolast & ~0xfff;
end = mgr->redoend;
- mgr->redolast = end;
- bt_releasemutex(mgr->redo);
- msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC);
+ if( end - last + sizeof(BtLogHdr) >= 32768 )
+ if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+ else
+ mgr->redolast = end;
+
bt_releasemutex(mgr->redo);
return lsn;
}
-// read page into buffer pool from permanent location in Btree file
+// sync a single btree page to disk
-BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
+BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
{
-off64_t off = page_no << mgr->page_bits;
+uint segment = latch->page_no >> 16;
+BtPage perm;
- if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
- fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
- return BTERR_read;
- }
-if( page->page_no != page_no )
-abort();
- mgr->reads++;
+ if( bt_writepage (mgr, page, latch->page_no) )
+ return mgr->err;
+
+ perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
+
+ if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
+ latch->dirty = 0;
return 0;
-/*
+}
+
+// read page into buffer pool from permanent location in Btree file
+
+BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
+{
int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
while( 1 ) {
if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
- memcpy (page, perm, mgr->page_size);
-if( page->page_no != page_no )
+ perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( perm->page_no != page_no )
abort();
+ memcpy (page, perm, mgr->page_size);
mgr->reads++;
return 0;
}
mgr->segments++;
bt_releasemutex (mgr->maps);
- } */
+ }
}
// write page to permanent location in Btree file
// clear the dirty bit
-BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, int syncit)
+BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
{
-off64_t off = page_no << mgr->page_bits;
-
- if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size ) {
- fprintf (stderr, "Unable to write page %d errno = %d\n", page_no, errno);
- return BTERR_wrt;
- }
- mgr->writes++;
- return 0;
-/*
int flag = PROT_READ | PROT_WRITE;
-uint segment = page_no >> 32;
-unsigned char *perm;
+uint segment = page_no >> 16;
+BtPage perm;
while( 1 ) {
if( segment < mgr->segments ) {
- perm = mgr->pages[segment] + ((page_no & 0xffffffff) << mgr->page_bits);
+ perm = (BtPage)(mgr->pages[segment] + ((page_no & 0xffff) << mgr->page_bits));
+if( page_no > LEAF_page && perm->page_no != page_no)
+abort();
memcpy (perm, page, mgr->page_size);
- if( syncit )
- msync (perm, mgr->page_size, MS_SYNC);
mgr->writes++;
return 0;
}
mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, mgr->segments << (mgr->page_bits + 16));
bt_releasemutex (mgr->maps);
mgr->segments++;
- } */
+ }
}
// set CLOCK bit in latch
BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
{
-BtPage page = (BtPage)(((uid)latch->entry << mgr->page_bits) + mgr->pagepool);
+uid entry = latch - mgr->latchsets;
+BtPage page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
return page;
}
// no read-lock is required since page is not pinned.
if( latch->dirty )
- if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
+ if( mgr->err = bt_writepage (mgr, page, latch->page_no) )
return mgr->line = __LINE__, NULL;
else
latch->dirty = 0;
memcpy (page, contents, mgr->page_size);
latch->dirty = 1;
} else if( bt_readpage (mgr, page, page_no) )
- return mgr->line = __LINE__, NULL;
+ return mgr->line = __LINE__, NULL;
// link page as head of hash table chain
// if this is a never before used entry,
latch->pin = CLOCK_bit | 1;
latch->page_no = page_no;
- latch->entry = entry;
latch->split = 0;
bt_releasemutex (latch->modify);
if( mgr->redoend ) {
eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
memset (eof, 0, sizeof(BtLogHdr));
-
- pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
}
// write remaining dirty pool pages to the btree
latch = mgr->latchsets + slot;
if( latch->dirty ) {
- bt_writepage(mgr, page, latch->page_no, 0);
+ bt_writepage(mgr, page, latch->page_no);
latch->dirty = 0, num++;
}
}
- // flush last batch to disk and clear
- // redo recovery buffer on disk.
-
- fdatasync (mgr->idx);
+ // clear redo recovery buffer on disk.
- if( mgr->redopages ) {
+ if( mgr->pagezero->redopages ) {
eof = (BtLogHdr *)mgr->redobuff;
memset (eof, 0, sizeof(BtLogHdr));
eof->lsn = mgr->lsn;
-
- pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
-
- sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+ if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
}
fprintf(stderr, "%d buffer pool pages flushed\n", num);
CloseHandle(mgr->hpool);
#endif
#ifdef unix
- free (mgr->redobuff);
close (mgr->idx);
free (mgr);
#else
memset (pagezero, 0, 1 << bits);
pagezero->alloc->lvl = MIN_lvl - 1;
pagezero->alloc->bits = mgr->page_bits;
- bt_putid(pagezero->alloc->right, redopages + MIN_lvl+1);
+ pagezero->redopages = redopages;
+
+ bt_putid(pagezero->alloc->right, pagezero->redopages + MIN_lvl+1);
pagezero->activepages = 2;
// initialize left-most LEAF page in
// alloc->left and count of active leaf pages.
bt_putid (pagezero->alloc->left, LEAF_page);
+ ftruncate (mgr->idx, (REDO_page + pagezero->redopages) << mgr->page_bits);
- ftruncate (mgr->idx, REDO_page << mgr->page_bits);
-
- if( bt_writepage (mgr, pagezero->alloc, 0, 1) ) {
+ if( bt_writepage (mgr, pagezero->alloc, 0) ) {
fprintf (stderr, "Unable to create btree page zero\n");
return bt_mgrclose (mgr), NULL;
}
pagezero->alloc->act = 1;
pagezero->alloc->page_no = MIN_lvl - lvl;
- if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, 1) ) {
+ if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
fprintf (stderr, "Unable to create btree page\n");
return bt_mgrclose (mgr), NULL;
}
VirtualFree (pagezero, 0, MEM_RELEASE);
#endif
#ifdef unix
- // mlock the pagezero page
+ // mlock the first segment of 64K pages
flag = PROT_READ | PROT_WRITE;
- mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
- if( mgr->pagezero == MAP_FAILED ) {
- fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
+ mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
+ mgr->segments = 1;
+
+ if( mgr->pages[0] == MAP_FAILED ) {
+ fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
+
+ mgr->pagezero = (BtPageZero *)mgr->pages[0];
mlock (mgr->pagezero, mgr->page_size);
+ mgr->redobuff = mgr->pages[0] + REDO_page * mgr->page_size;
+ mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
+
mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
if( mgr->pagepool == MAP_FAILED ) {
fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
- if( mgr->redopages = redopages ) {
- ftruncate (mgr->idx, (REDO_page + redopages) << mgr->page_bits);
- mgr->redobuff = valloc (redopages * mgr->page_size);
- }
#else
flag = PAGE_READWRITE;
mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
return bt_mgrclose (mgr), NULL;
}
- if( mgr->redopages = redopages )
- mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
#endif
mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
WriteOLock (latch->atomic, thread_no);
ReadLock (latch->readwr, thread_no);
break;
+ case BtLockAtomic | BtLockWrite:
+ WriteOLock (latch->atomic, thread_no);
+ WriteLock (latch->readwr, thread_no);
+ break;
}
}
WriteORelease (latch->atomic);
ReadRelease (latch->readwr);
break;
+ case BtLockAtomic | BtLockWrite:
+ WriteORelease (latch->atomic);
+ WriteRelease (latch->readwr);
+ break;
}
}
bt_mutexlock(mgr->lock);
// use empty chain first
- // else allocate empty page
+ // else allocate new page
if( page_no = bt_getid(mgr->pagezero->freechain) ) {
if( set->latch = bt_pinlatch (mgr, page_no, NULL, thread_id) )
else
return mgr->line = __LINE__, mgr->err = BTERR_struct;
- bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
mgr->pagezero->activepages++;
+ bt_putid(mgr->pagezero->freechain, bt_getid(set->page->right));
- bt_releasemutex(mgr->lock);
+ // the page is currently free and this
+ // will keep bt_txnpromote out.
- memcpy (set->page, contents, mgr->page_size);
+ // contents will replace this bit
+ // and pin will keep bt_txnpromote out
+
+ contents->page_no = page_no;
set->latch->dirty = 1;
+
+ memcpy (set->page, contents, mgr->page_size);
+
+ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
+ bt_releasemutex(mgr->lock);
return 0;
}
// extend file into new page.
mgr->pagezero->activepages++;
-
- ftruncate (mgr->idx, (uid)(page_no + 1) << mgr->page_bits);
+ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
bt_releasemutex(mgr->lock);
- // don't load cache from btree page, load it from contents
+ // keep bt_txnpromote out of this page
+ contents->free = 1;
contents->page_no = page_no;
+ pwrite (mgr->idx, contents, mgr->page_size, page_no << mgr->page_bits);
+
+ // don't load cache from btree page, load it from contents
if( set->latch = bt_pinlatch (mgr, page_no, contents, thread_id) )
set->page = bt_mappage (mgr, set->latch);
else
return mgr->err;
+ // now pin will keep bt_txnpromote out
+
+ set->page->free = 0;
return 0;
}
int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
{
-uid page_no = ROOT_page, prevpage = 0;
+uid page_no = ROOT_page, prevpage_no = 0;
uint drill = 0xff, slot;
BtLatchSet *prevlatch;
uint mode, prevmode;
+BtPage prevpage;
BtVal *val;
// start at root of btree and drill down
bt_lockpage(BtLockAccess, set->latch, thread_no);
set->page = bt_mappage (mgr, set->latch);
+if( set->latch->promote )
+abort();
// release & unpin parent or left sibling page
- if( prevpage ) {
+ if( prevpage_no ) {
bt_unlockpage(prevmode, prevlatch);
bt_unpinlatch (mgr, prevlatch);
- prevpage = 0;
+ prevpage_no = 0;
}
// obtain mode lock using lock chaining through AccessLock
}
}
- prevpage = set->latch->page_no;
+ prevpage_no = set->latch->page_no;
prevlatch = set->latch;
+ prevpage = set->page;
prevmode = mode;
// find key on page at this level
// return page to free list
// page must be delete & write locked
+// and have no keys pointing to it.
void bt_freepage (BtMgr *mgr, BtPageSet *set)
{
memcpy(set->page->right, mgr->pagezero->freechain, BtId);
bt_putid(mgr->pagezero->freechain, set->latch->page_no);
+ set->latch->promote = 0;
set->latch->dirty = 1;
set->page->free = 1;
// decrement active page count
- // and unlock allocation page
mgr->pagezero->activepages--;
- bt_releasemutex (mgr->lock);
+
+ if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
+ fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
// unlock released page
+ // and unlock allocation page
bt_unlockpage (BtLockDelete, set->latch);
bt_unlockpage (BtLockWrite, set->latch);
bt_unpinlatch (mgr, set->latch);
+ bt_releasemutex (mgr->lock);
}
// a fence key was deleted from a page
// delete a page and manage keys
// call with page writelocked
-BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
+// returns with page removed
+// from the page pool.
+
+BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, int delkey)
{
unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
unsigned char value[BtId];
// pull contents of right peer into our empty page
memcpy (set->page, right->page, mgr->page_size);
+ set->page->page_no = set->latch->page_no;
set->latch->dirty = 1;
// mark right page deleted and point it to left page
ptr = (BtKey*)lowerfence;
- if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
+ if( delkey )
+ if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
return mgr->err;
// obtain delete and write locks to right node
bt_freepage (mgr, right);
bt_unlockpage (BtLockParent, set->latch);
+ bt_unpinlatch (mgr, set->latch);
return 0;
}
break;
}
+ if( !found )
+ return 0;
+
// did we delete a fence key in an upper level?
- if( found && lvl && set->page->act && fence )
+ if( lvl && set->page->act && fence )
if( bt_fixfence (mgr, set, lvl, thread_no) )
return mgr->err;
else
// do we need to collapse root?
- if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
+ if( set->latch->page_no == ROOT_page && set->page->act == 1 )
if( bt_collapseroot (mgr, set, thread_no) )
return mgr->err;
else
// delete empty page
- if( !set->page->act ) {
- if( bt_deletepage (mgr, set, thread_no) )
- return mgr->err;
- } else {
- set->latch->dirty = 1;
- bt_unlockpage(BtLockWrite, set->latch);
- }
+ if( !set->page->act )
+ return bt_deletepage (mgr, set, thread_no, 1);
+ set->latch->dirty = 1;
+ bt_unlockpage(BtLockWrite, set->latch);
bt_unpinlatch (mgr, set->latch);
return 0;
}
unsigned char value[BtId];
BtPageSet left[1];
uid left_page_no;
+BtPage frame;
BtKey *ptr;
BtVal *val;
+ frame = malloc (mgr->page_size);
+ memcpy (frame, root->page, mgr->page_size);
+
// save left page fence key for new root
ptr = keyptr(root->page, root->page->cnt);
// Obtain an empty page to use, and copy the current
// root contents into it, e.g. lower keys
- if( bt_newpage(mgr, left, root->page, page_no) )
+ if( bt_newpage(mgr, left, frame, page_no) )
return mgr->err;
left_page_no = left->latch->page_no;
bt_unpinlatch (mgr, left->latch);
+ free (frame);
// preserve the page info at the bottom
// of higher keys and set rest to zero
// split already locked full node
// leave it locked.
// return pool entry for new right
-// page, unlocked
+// page, pinned & unlocked
uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
{
bt_putid(set->page->right, right->latch->page_no);
set->page->min = nxt;
set->page->cnt = idx;
+ free(frame);
- return right->latch->entry;
+ return right->latch - mgr->latchsets;
}
// fix keys for newly split page
-// call with page locked, return
-// unlocked
+// call with both pages pinned & locked
+// return unlocked and unpinned
BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
{
BtSlot *node;
BtKey *ptr;
BtVal *val;
+int rate;
// if found slot > desired slot and previous slot
// is a librarian slot, use it
if( slotptr(set->page, idx)->dead )
break;
- // now insert key into array before slot
+ // now insert key into array before slot,
+ // adding as many librarian slots as
+ // makes sense.
- if( idx == set->page->cnt )
- idx += 2, set->page->cnt += 2, librarian = 2;
- else
- librarian = 1;
+ if( idx == set->page->cnt ) {
+ int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
- set->latch->dirty = 1;
- set->page->act++;
+ librarian = ++idx - slot;
+ avail /= sizeof(BtSlot);
- while( idx > slot + librarian - 1 )
- *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
+ if( avail < 0 )
+ avail = 0;
- // add librarian slot
+ if( librarian > avail )
+ librarian = avail;
- if( librarian > 1 ) {
- node = slotptr(set->page, slot++);
- node->off = set->page->min;
- node->type = Librarian;
- node->dead = 1;
+ if( librarian ) {
+ rate = (idx - slot) / librarian;
+ set->page->cnt += librarian;
+ idx += librarian;
+ } else
+ rate = 0;
+ } else
+ librarian = 0, rate = 0;
+
+ while( idx > slot ) {
+ // transfer slot
+ *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
+ idx--;
+
+ // add librarian slot per rate
+
+ if( librarian )
+ if( (idx - slot + 1)/2 <= librarian * rate ) {
+// if( rate && !(idx % rate) ) {
+ node = slotptr(set->page, idx--);
+ node->off = node[1].off;
+ node->type = Librarian;
+ node->dead = 1;
+ librarian--;
+ }
}
+if(librarian)
+abort();
+ set->latch->dirty = 1;
+ set->page->act++;
// fill in new slot
return 0;
}
-typedef struct {
- logseqno reqlsn; // redo log seq no required
- logseqno lsn; // redo log sequence number
- uint entry; // latch table entry number
- uint slot:31; // page slot number
- uint reuse:1; // reused previous page
-} AtomicTxn;
-
-typedef struct {
- uid page_no; // page number for split leaf
- void *next; // next key to insert
- uint entry:29; // latch table entry number
- uint type:2; // 0 == insert, 1 == delete, 2 == free
- uint nounlock:1; // don't unlock ParentModification
- unsigned char leafkey[BT_keyarray];
-} AtomicKey;
-
// determine actual page where key is located
// return slot number
return 0;
}
-BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
{
BtKey *key = keyptr(source, src);
BtVal *val = valptr(source, src);
if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
return mgr->err;
- set->page->lsn = locks[src].lsn;
+ set->page->lsn = lsn;
return 0;
}
+ // split page
+
if( entry = bt_splitpage (mgr, set, thread_no) )
latch = mgr->latchsets + entry;
else
return mgr->err;
// splice right page into split chain
- // and WriteLock it.
+ // and WriteLock it
bt_lockpage(BtLockWrite, latch, thread_no);
latch->split = set->latch->split;
// perform delete from smaller btree
// insert a delete slot if not found there
-BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
{
BtKey *key = keyptr(source, src);
BtPageSet set[1];
set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
set->latch->dirty = 1;
- set->page->lsn = locks[src].lsn;
+ set->page->lsn = lsn;
set->page->act--;
node->dead = 0;
- mgr->found++;
+ __sync_fetch_and_add(&mgr->found, 1);
return 0;
}
-// delete an empty master page for a transaction
-
-// note that the far right page never empties because
-// it always contains (at least) the infinite stopper key
-// and that all pages that don't contain any keys are
-// deleted, or are being held under Atomic lock.
-
-BTERR bt_atomicfree (BtMgr *mgr, BtPageSet *prev, ushort thread_no)
+int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
{
-BtPageSet right[1], temp[1];
-unsigned char value[BtId];
-uid right_page_no;
-BtKey *ptr;
-
- bt_lockpage(BtLockWrite, prev->latch, thread_no);
-
- // grab the right sibling
-
- if( right->latch = bt_pinlatch(mgr, bt_getid (prev->page->right), NULL, thread_no) )
- right->page = bt_mappage (mgr, right->latch);
- else
- return mgr->err;
-
- bt_lockpage(BtLockAtomic, right->latch, thread_no);
- bt_lockpage(BtLockWrite, right->latch, thread_no);
-
- // and pull contents over empty page
- // while preserving master's left link
+BtKey *key1 = (BtKey *)((char *)page + slot1->off);
+BtKey *key2 = (BtKey *)((char *)page + slot2->off);
- memcpy (right->page->left, prev->page->left, BtId);
- memcpy (prev->page, right->page, mgr->page_size);
-
- // forward seekers to old right sibling
- // to new page location in set
-
- bt_putid (right->page->right, prev->latch->page_no);
- right->latch->dirty = 1;
- right->page->kill = 1;
-
- // remove pointer to right page for searchers by
- // changing right fence key to point to the master page
-
- ptr = keyptr(right->page,right->page->cnt);
- bt_putid (value, prev->latch->page_no);
-
- if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
- return mgr->err;
-
- // now that master page is in good shape we can
- // remove its locks.
-
- bt_unlockpage (BtLockAtomic, prev->latch);
- bt_unlockpage (BtLockWrite, prev->latch);
-
- // fix master's right sibling's left pointer
- // to remove scanner's poiner to the right page
-
- if( right_page_no = bt_getid (prev->page->right) ) {
- if( temp->latch = bt_pinlatch (mgr, right_page_no, NULL, thread_no) )
- temp->page = bt_mappage (mgr, temp->latch);
- else
- return mgr->err;
-
- bt_lockpage (BtLockWrite, temp->latch, thread_no);
- bt_putid (temp->page->left, prev->latch->page_no);
- temp->latch->dirty = 1;
-
- bt_unlockpage (BtLockWrite, temp->latch);
- bt_unpinlatch (mgr, temp->latch);
- } else { // master is now the far right page
- bt_mutexlock (mgr->lock);
- bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
- bt_releasemutex(mgr->lock);
- }
-
- // now that there are no pointers to the right page
- // we can delete it after the last read access occurs
-
- bt_unlockpage (BtLockWrite, right->latch);
- bt_unlockpage (BtLockAtomic, right->latch);
- bt_lockpage (BtLockDelete, right->latch, thread_no);
- bt_lockpage (BtLockWrite, right->latch, thread_no);
- bt_freepage (mgr, right);
- return 0;
+ return keycmp (key1, key2->key, key2->len);
}
-
// atomic modification of a batch of keys.
-// return -1 if BTERR is set
-// otherwise return slot number
-// causing the key constraint violation
-// or zero on successful completion.
-
BTERR bt_atomictxn (BtDb *bt, BtPage source)
{
uint src, idx, slot, samepage, entry, que = 0;
-AtomicKey *head, *tail, *leaf;
-BtPageSet set[1], prev[1];
-unsigned char value[BtId];
BtKey *key, *ptr, *key2;
-BtLatchSet *latch;
-AtomicTxn *locks;
int result = 0;
BtSlot temp[1];
logseqno lsn;
-BtPage page;
-BtVal *val;
-uid right;
int type;
- locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
- head = NULL;
- tail = NULL;
-
// stable sort the list of keys into order to
// prevent deadlocks between threads.
-
+/*
for( src = 1; src++ < source->cnt; ) {
*temp = *slotptr(source,src);
key = keyptr (source,src);
break;
}
}
-
+*/
+ qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
// add entries to redo log
- if( bt->mgr->redopages )
- if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
- for( src = 0; src++ < source->cnt; )
- locks[src].lsn = lsn;
- else
- return bt->mgr->err;
+ if( bt->mgr->pagezero->redopages )
+ lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
+ else
+ lsn = 0;
+
+ // perform the individual actions in the transaction
+
+ if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
+ return bt->mgr->err;
+
+ // if number of active pages
+ // is greater than the buffer pool
+ // promote page into larger btree
+
+ if( bt->main )
+ while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+ if( bt_txnpromote (bt) )
+ return bt->mgr->err;
+
+ // return success
+
+ return 0;
+}
+
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
+{
+uint src, idx, slot, samepage, entry, que = 0;
+BtPageSet set[1], prev[1];
+unsigned char value[BtId];
+BtLatchSet *latch;
+AtomicTxn *locks;
+BtKey *key, *ptr;
+BtPage page;
+BtVal *val;
+uid right;
+
+ locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
// Load the leaf page for each key
// group same page references with reuse bit
if( samepage = src > 1 )
if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
slot = bt_findslot(set->page, key->key, key->len);
- else
- bt_unlockpage(BtLockRead, set->latch);
if( !slot )
- if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
+ if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockAtomic, thread_no) )
set->latch->split = 0;
else
- return bt->mgr->err;
+ return mgr->err;
if( slotptr(set->page, slot)->type == Librarian )
ptr = keyptr(set->page, ++slot);
ptr = keyptr(set->page, slot);
if( !samepage ) {
- locks[src].entry = set->latch->entry;
+ locks[src].entry = set->latch - mgr->latchsets;
locks[src].slot = slot;
locks[src].reuse = 0;
} else {
locks[src].reqlsn = set->page->lsn;
}
- // unlock last loadpage lock
-
- if( source->cnt )
- bt_unlockpage(BtLockRead, set->latch);
-
// obtain write lock for each master page
- // sync flushed pages to disk
for( src = 0; src++ < source->cnt; ) {
if( locks[src].reuse )
continue;
- set->latch = bt->mgr->latchsets + locks[src].entry;
- bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+ set->latch = mgr->latchsets + locks[src].entry;
+ bt_lockpage (BtLockWrite, set->latch, thread_no);
}
// insert or delete each key
// process any splits or merges
- // release Write & Atomic latches
- // set ParentModifications and build
- // queue of keys to insert for split pages
- // or delete for deleted pages.
-
// run through txn list backwards
samepage = source->cnt + 1;
for( idx = src; idx < samepage; idx++ )
switch( slotptr(source,idx)->type ) {
case Delete:
- if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
+ return mgr->err;
break;
case Duplicate:
case Unique:
- if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
- return bt->mgr->err;
+ if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
+ return mgr->err;
+ break;
+
+ default:
+ bt_atomicpage (mgr, source, locks, idx, set);
break;
}
// after the same page operations have finished,
// process master page for splits or deletion.
- latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
- prev->page = bt_mappage (bt->mgr, prev->latch);
+ latch = prev->latch = mgr->latchsets + locks[src].entry;
+ prev->page = bt_mappage (mgr, prev->latch);
samepage = src;
// pick-up all splits from master page
- // each one is already WriteLocked.
+ // each one is already pinned & WriteLocked.
- entry = prev->latch->split;
-
- while( entry ) {
- set->latch = bt->mgr->latchsets + entry;
- set->page = bt_mappage (bt->mgr, set->latch);
- entry = set->latch->split;
+ if( entry = prev->latch->split ) do {
+ set->latch = mgr->latchsets + entry;
+ set->page = bt_mappage (mgr, set->latch);
// delete empty master page by undoing its split
// (this is potentially another empty page)
- // note that there are no new left pointers yet
+ // note that there are no pointers to it yet
if( !prev->page->act ) {
memcpy (set->page->left, prev->page->left, BtId);
- memcpy (prev->page, set->page, bt->mgr->page_size);
- bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
- bt_freepage (bt->mgr, set);
-
+ memcpy (prev->page, set->page, mgr->page_size);
+ bt_lockpage (BtLockDelete, set->latch, thread_no);
prev->latch->split = set->latch->split;
prev->latch->dirty = 1;
+ bt_freepage (mgr, set);
continue;
}
- // remove empty page from the split chain
- // and return it to the free list.
+ // remove empty split page from the split chain
+ // and return it to the free list. No other
+ // thread has its page number yet.
if( !set->page->act ) {
memcpy (prev->page->right, set->page->right, BtId);
prev->latch->split = set->latch->split;
- bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
- bt_freepage (bt->mgr, set);
+
+ bt_lockpage (BtLockDelete, set->latch, thread_no);
+ bt_freepage (mgr, set);
continue;
}
- // schedule prev fence key update
+ // update prev's fence key
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = calloc (sizeof(AtomicKey), 1), que++;
+ bt_putid (value, prev->latch->page_no);
- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
- leaf->page_no = prev->latch->page_no;
- leaf->entry = prev->latch->entry;
- leaf->type = 0;
-
- if( tail )
- tail->next = leaf;
- else
- head = leaf;
-
- tail = leaf;
+ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+ return mgr->err;
// splice in the left link into the split page
bt_putid (set->page->left, prev->latch->page_no);
- bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
+
+ if( lsm )
+ bt_syncpage (mgr, prev->page, prev->latch);
+
+ // page is unpinned below to avoid bt_txnpromote
+
bt_unlockpage(BtLockWrite, prev->latch);
*prev = *set;
- }
+ } while( entry = prev->latch->split );
// update left pointer in next right page from last split page
- // (if all splits were reversed, latch->split == 0)
+ // (if all splits were reversed or none occurred, latch->split == 0)
if( latch->split ) {
// fix left pointer in master's original (now split)
// far right sibling or set rightmost page in page zero
if( right = bt_getid (prev->page->right) ) {
- if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
- set->page = bt_mappage (bt->mgr, set->latch);
+ if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
+ set->page = bt_mappage (mgr, set->latch);
else
- return bt->mgr->err;
+ return mgr->err;
- bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+ bt_lockpage (BtLockWrite, set->latch, thread_no);
bt_putid (set->page->left, prev->latch->page_no);
set->latch->dirty = 1;
+
bt_unlockpage (BtLockWrite, set->latch);
- bt_unpinlatch (bt->mgr, set->latch);
+ bt_unpinlatch (mgr, set->latch);
} else { // prev is rightmost page
- bt_mutexlock (bt->mgr->lock);
- bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
- bt_releasemutex(bt->mgr->lock);
+ bt_mutexlock (mgr->lock);
+ bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
+ bt_releasemutex(mgr->lock);
}
// Process last page split in chain
+ // by switching the key from the master
+ // page to the last split.
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = calloc (sizeof(AtomicKey), 1), que++;
-
- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
- leaf->page_no = prev->latch->page_no;
- leaf->entry = prev->latch->entry;
- leaf->type = 0;
-
- if( tail )
- tail->next = leaf;
- else
- head = leaf;
+ bt_putid (value, prev->latch->page_no);
- tail = leaf;
+ if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+ return mgr->err;
- bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
bt_unlockpage(BtLockWrite, prev->latch);
- // remove atomic lock on master page
+ if( lsm )
+ bt_syncpage (mgr, prev->page, prev->latch);
bt_unlockpage(BtLockAtomic, latch);
+ bt_unpinlatch(mgr, latch);
+
+ // go through the list of splits and
+ // release the latch pins
+
+ while( entry = latch->split ) {
+ latch = mgr->latchsets + entry;
+ bt_unpinlatch(mgr, latch);
+ }
+
continue;
}
- // finished if prev page occupied (either master or final split)
+ // since there are no splits, we're
+ // finished if master page occupied
if( prev->page->act ) {
- bt_unlockpage(BtLockWrite, latch);
- bt_unlockpage(BtLockAtomic, latch);
- bt_unpinlatch(bt->mgr, latch);
+ bt_unlockpage(BtLockAtomic, prev->latch);
+ bt_unlockpage(BtLockWrite, prev->latch);
+
+ if( lsm )
+ bt_syncpage (mgr, prev->page, prev->latch);
+
+ bt_unpinlatch(mgr, prev->latch);
continue;
}
// any and all splits were reversed, and the
// master page located in prev is empty, delete it
- // by pulling over master's right sibling.
-
- // Remove empty master's fence key
-
- ptr = keyptr(prev->page,prev->page->cnt);
-
- if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
- return bt->mgr->err;
- // perform the remainder of the delete
- // from the FIFO queue
-
- leaf = calloc (sizeof(AtomicKey), 1), que++;
-
- memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
- leaf->page_no = prev->latch->page_no;
- leaf->entry = prev->latch->entry;
- leaf->nounlock = 1;
- leaf->type = 2;
-
- if( tail )
- tail->next = leaf;
- else
- head = leaf;
-
- tail = leaf;
-
- // leave atomic lock in place until
- // deletion completes in next phase.
-
- bt_unlockpage(BtLockWrite, prev->latch);
+ if( bt_deletepage (mgr, prev, thread_no, 1) )
+ return mgr->err;
}
- // add & delete keys for any pages split or merged during transaction
-
- if( leaf = head )
- do {
- set->latch = bt->mgr->latchsets + leaf->entry;
- set->page = bt_mappage (bt->mgr, set->latch);
-
- bt_putid (value, leaf->page_no);
- ptr = (BtKey *)leaf->leafkey;
-
- switch( leaf->type ) {
- case 0: // insert key
- if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
- return bt->mgr->err;
-
- break;
-
- case 1: // delete key
- if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
- return bt->mgr->err;
-
- break;
-
- case 2: // free page
- if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
- return bt->mgr->err;
-
- break;
- }
-
- if( !leaf->nounlock )
- bt_unlockpage (BtLockParent, set->latch);
-
- bt_unpinlatch (bt->mgr, set->latch);
- tail = leaf->next;
- free (leaf);
- } while( leaf = tail );
-
- // if number of active pages
- // is greater than the buffer pool
- // promote page into larger btree
-
- while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
- if( bt_txnpromote (bt) )
- return bt->mgr->err;
-
- // return success
-
free (locks);
return 0;
}
continue;
set->latch = bt->mgr->latchsets + entry;
- idx = set->latch->page_no % bt->mgr->latchhash;
if( !bt_mutextry(set->latch->modify) )
continue;
-// if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
-// bt_releasemutex(set->latch->modify);
-// continue;
-// }
-
// skip this entry if it is pinned
if( set->latch->pin & ~CLOCK_bit ) {
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
continue;
}
if( !set->latch->page_no || !bt_getid (set->page->right) ) {
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
continue;
}
- bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
- bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
- bt_unlockpage(BtLockAccess, set->latch);
-
- // entry interiour node or being or was killed
+ // entry interiour node or being killed or promoted
if( set->page->lvl || set->page->free || set->page->kill ) {
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
- bt_unlockpage(BtLockWrite, set->latch);
continue;
}
// pin the page for our useage
set->latch->pin++;
+ set->latch->promote = 1;
bt_releasemutex(set->latch->modify);
-// bt_releasemutex(bt->mgr->hashtable[idx].latch);
- // if page is dirty, then
- // sync it to the disk first.
+ bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
- if( set->latch->dirty )
- if( bt->mgr->err = bt_writepage (bt->mgr, set->page, set->latch->page_no, 1) )
- return bt->mgr->line = __LINE__, bt->mgr->err;
- else
- set->latch->dirty = 0;
+ // remove the key for the page
+ // and wait for other threads to fade away
- // transfer slots in our selected page to larger btree
-if( !(set->latch->page_no % 100) )
-fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
-
- for( slot = 0; slot++ < set->page->cnt; ) {
- ptr = keyptr (set->page, slot);
- val = valptr (set->page, slot);
- node = slotptr(set->page, slot);
+ ptr = keyptr(set->page, set->page->cnt);
- switch( node->type ) {
- case Duplicate:
- case Unique:
- if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
- return bt->main->err;
+ if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
+ return bt->mgr->err;
- continue;
+ bt_unlockpage (BtLockWrite, set->latch);
+while( (set->latch->pin & ~CLOCK_bit) > 1 )
+sched_yield();
+ bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
+ bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
+ bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
- case Delete:
- if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
- return bt->main->err;
+ // transfer slots in our selected page to larger btree
+if( !(set->latch->page_no % 100) )
+fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
- continue;
- }
- }
+ if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
+ return bt->main->err;
// now delete the page
- if( bt_deletepage (bt->mgr, set, bt->thread_no) )
- return bt->mgr->err;
-
- bt_unpinlatch (bt->mgr, set->latch);
- return 0;
+ bt_unlockpage (BtLockDelete, set->latch);
+ bt_unlockpage (BtLockAtomic, set->latch);
+ return bt_deletepage (bt->mgr, set, bt->thread_no, 0);
}
}
posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
#endif
fprintf(stderr, "started counting\n");
- next = LEAF_page + bt->mgr->redopages + 1;
+ next = REDO_page + bt->mgr->pagezero->redopages;
while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
if( bt_readpage (bt->mgr, bt->cursor, page_no) )
break;
}
- fprintf(stderr, "%d reads %d writes %d found\n", bt->mgr->reads, bt->mgr->writes, bt->mgr->found);
bt_close (bt);
#ifdef unix
return NULL;
HANDLE *threads;
#endif
ThreadArg *args;
+uint redopages = 0;
uint poolsize = 0;
-uint recovery = 0;
uint mainpool = 0;
uint mainbits = 0;
float elapsed;
num = atoi(argv[6]);
if( argc > 7 )
- recovery = atoi(argv[7]);
+ redopages = atoi(argv[7]);
+
+ if( redopages + REDO_page > 65535 )
+ fprintf (stderr, "Warning: Recovery buffer too large\n");
if( argc > 8 )
mainbits = atoi(argv[8]);
#endif
args = malloc ((cnt + 1) * sizeof(ThreadArg));
- mgr = bt_mgr (argv[1], bits, poolsize, recovery);
+ mgr = bt_mgr (argv[1], bits, poolsize, redopages);
if( !mgr ) {
fprintf(stderr, "Index Open Error %s\n", argv[1]);
exit (1);
}
- main = bt_mgr (argv[2], mainbits, mainpool, 0);
+ if( mainbits ) {
+ main = bt_mgr (argv[2], mainbits, mainpool, 0);
- if( !main ) {
- fprintf(stderr, "Index Open Error %s\n", argv[2]);
- exit (1);
- }
+ if( !main ) {
+ fprintf(stderr, "Index Open Error %s\n", argv[2]);
+ exit (1);
+ }
+ } else
+ main = NULL;
// fire off threads
CloseHandle(threads[idx]);
#endif
bt_poolaudit(mgr);
- bt_poolaudit(main);
- bt_mgrclose (main);
+ if( main )
+ bt_poolaudit(main);
+
+ fprintf(stderr, "%d reads %d writes %d found\n", mgr->reads, mgr->writes, mgr->found);
+
+ if( main )
+ bt_mgrclose (main);
bt_mgrclose (mgr);
elapsed = getCpuTime(0) - start;