-// btree version threadskv9 sched_yield version
+// btree version threadskv9c FUTEX version
// with reworked bt_deletekey code,
// phase-fair reader writer lock,
// librarian page split code,
// ACID batched key-value updates
// and redo log for failure recovery
-// 01 OCT 2014
+// 07 OCT 2014
// author: karl malbrain, malbrain@cal.berkeley.edu
#ifdef linux
#define _GNU_SOURCE
+#include <linux/futex.h>
+#define SYS_futex 202
#endif
#ifdef unix
#include <sys/mman.h>
#include <errno.h>
#include <pthread.h>
+#include <limits.h>
#else
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
ushort dup;
} RWLock;
-// write only queue lock
+// write only lock
typedef struct {
- volatile ushort ticket[1];
- volatile ushort serving[1];
+ volatile uint exclusive[1];
ushort tid;
ushort dup;
} WOLock;
#define MASK 0x3
#define RINC 0x4
-// definition for spin latch implementation
+// lite weight mutex
// exclusive is set for write access
-// share is count of read accessors
-// grant write lock when share == 0
-volatile typedef struct {
- ushort exclusive:1;
- ushort pending:1;
- ushort share:14;
-} BtSpinLatch;
+typedef struct {
+ volatile uint exclusive[1];
+} BtMutexLatch;
#define XCL 1
-#define PEND 2
-#define BOTH 3
-#define SHARE 4
+
+// mode & definition for lite latch implementation
+
+enum {
+ QueRd = 1, // reader queue
+ QueWr = 2 // writer queue
+} RWQueue;
// hash table entries
typedef struct {
- volatile uint slot; // Latch table entry at head of chain
- BtSpinLatch latch[1];
+ volatile uint entry; // Latch table entry at head of chain
+ BtMutexLatch latch[1];
} BtHashEntry;
// latch manager table structure
typedef struct {
- uid page_no; // latch set page number
- RWLock readwr[1]; // read/write page lock
- RWLock access[1]; // Access Intent/Page delete
- WOLock parent[1]; // Posting of fence key in parent
- WOLock atomic[1]; // Atomic update in progress
- uint split; // right split page atomic insert
- uint entry; // entry slot in latch table
- uint next; // next entry in hash table chain
- uint prev; // prev entry in hash table chain
- volatile ushort pin; // number of outstanding threads
- ushort dirty:1; // page in cache is dirty
+ uid page_no; // latch set page number
+ RWLock readwr[1]; // read/write page lock
+ RWLock access[1]; // Access Intent/Page delete
+ WOLock parent[1]; // Posting of fence key in parent
+ WOLock atomic[1]; // Atomic update in progress
+ uint split; // right split page atomic insert
+ uint entry; // entry slot in latch table
+ uint next; // next entry in hash table chain
+ uint prev; // prev entry in hash table chain
+ BtMutexLatch modify[1]; // modify entry lite latch
+ volatile ushort pin; // number of accessing threads
+ volatile unsigned char dirty; // page in cache is dirty (atomic set)
+ volatile unsigned char avail; // page is an available entry
} BtLatchSet;
// Define the length of the page record numbers
unsigned char right[BtId]; // page number to right
unsigned char left[BtId]; // page number to left
unsigned char filler[2]; // padding to multiple of 8
- logseqno lsn; // last LogSeqNo applied to page
+ logseqno lsn; // log sequence number applied
} *BtPage;
// The loadpage interface object
BtLatchSet *latchsets; // mapped latch set from buffer pool
unsigned char *pagepool; // mapped to the buffer pool pages
unsigned char *redobuff; // mapped recovery buffer pointer
- logseqno flushlsn; // first lsn flushed w/msync
- BtSpinLatch redo[1]; // redo area lite latch
- BtSpinLatch lock[1]; // allocation area lite latch
+ logseqno lsn, flushlsn; // current & first lsn flushed
+ BtMutexLatch dump[1]; // redo dump lite latch
+ BtMutexLatch redo[1]; // redo area lite latch
+ BtMutexLatch lock[1]; // allocation area lite latch
ushort thread_no[1]; // next thread number
- uint latchdeployed; // highest number of latch entries deployed
uint nlatchpage; // number of latch pages at BT_latch
uint latchtotal; // number of page latch entries
uint latchhash; // number of latch hash table slots
uint latchvictim; // next latch entry to examine
+ uint latchavail; // next available latch entry
+ uint availlock[1]; // latch available chain commitments
+ uint available; // size of latch available chain
uint redopages; // size of recovery buff in pages
uint redoend; // eof/end element in recovery buff
#ifndef unix
unsigned char key[BT_keyarray]; // last found complete key
int found; // last delete or insert was found
int err; // last error
+ int line; // last error line no
int reads, writes; // number of reads and writes from the btree
ushort thread_no; // thread number
} BtDb;
BTERR_read,
BTERR_wrt,
BTERR_atomic,
- BTERR_recovery
+ BTERR_recovery,
+ BTERR_avail
} BTERR;
#define CLOCK_bit 0x8000
BTRM_del, // delete a key-value from btree
BTRM_upd, // update a key with a new value
BTRM_new, // allocate a new empty page
- BTRM_old, // reuse an old empty page
- BTRM_end = 255 // circular buffer inter-gap
+ BTRM_old // reuse an old empty page
} BTRM;
// recovery manager entry
// structure followed by BtKey & BtVal
typedef struct {
+ logseqno reqlsn; // log sequence number required
logseqno lsn; // log sequence number for entry
uint len; // length of entry
unsigned char type; // type of entry
extern void bt_close (BtDb *bt);
extern BtDb *bt_open (BtMgr *mgr);
-extern void bt_flushlsn (BtDb *bt);
+extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no);
+extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no);
extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch);
extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages);
extern void bt_mgrclose (BtMgr *mgr);
extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val);
+extern logseqno bt_txnredo (BtDb *bt, BtPage page);
// Helper functions to return slot values
// from the cursor page.
void WriteOLock (WOLock *lock, ushort tid)
{
-ushort tix;
+uint prev;
if( lock->tid == tid ) {
lock->dup++;
return;
}
+
+ while( 1 ) {
#ifdef unix
- tix = __sync_fetch_and_add (lock->ticket, 1);
+ prev = __sync_fetch_and_or (lock->exclusive, 1);
#else
- tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
+ prev = _InterlockedExchangeOr (lock->exclusive, 1);
#endif
- // wait for our ticket to come up
-
- while( tix != lock->serving[0] )
+ if( !(prev & XCL) ) {
+ lock->tid = tid;
+ return;
+ }
#ifdef unix
- sched_yield();
+ sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
#else
- SwitchToThread ();
+ SwitchToThread ();
#endif
- lock->tid = tid;
+ }
}
void WriteORelease (WOLock *lock)
return;
}
+ *lock->exclusive = 0;
lock->tid = 0;
- lock->serving[0]++;
+#ifdef linux
+ sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+#endif
}
// Phase-Fair reader/writer lock implementation
lock->serving[0]++;
}
+// try to obtain read lock
+// return 1 if successful
+
+int ReadTry (RWLock *lock, ushort tid)
+{
+ushort w;
+
+ // OK if write lock already held by same thread
+
+ if( lock->tid == tid ) {
+ lock->dup++;
+ return 1;
+ }
+#ifdef unix
+ w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
+#else
+ w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
+#endif
+ if( w )
+ if( w == (*lock->rin & MASK) ) {
+#ifdef unix
+ __sync_fetch_and_add (lock->rin, -RINC);
+#else
+ _InterlockedExchangeAdd16 (lock->rin, -RINC);
+#endif
+ return 0;
+ }
+
+ return 1;
+}
+
void ReadLock (RWLock *lock, ushort tid)
{
ushort w;
#endif
}
-// Spin Latch Manager
-
-// wait until write lock mode is clear
-// and add 1 to the share count
+// lite weight spin lock Latch Manager
-void bt_spinreadlock(BtSpinLatch *latch)
+int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
{
-ushort prev;
-
- do {
-#ifdef unix
- prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
-#else
- prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
-#endif
- // see if exclusive request is granted or pending
-
- if( !(prev & BOTH) )
- return;
-#ifdef unix
- prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
-#else
- prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
-#endif
-#ifdef unix
- } while( sched_yield(), 1 );
-#else
- } while( SwitchToThread(), 1 );
-#endif
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
}
-// wait for other read and write latches to relinquish
-
-void bt_spinwritelock(BtSpinLatch *latch)
+void bt_mutexlock(BtMutexLatch *latch)
{
-ushort prev;
+uint prev;
- do {
+ while( 1 ) {
#ifdef unix
- prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
+ prev = __sync_fetch_and_or(latch->exclusive, XCL);
#else
- prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
+ prev = _InterlockedOr(latch->exclusive, XCL);
#endif
if( !(prev & XCL) )
- if( !(prev & ~BOTH) )
return;
- else
-#ifdef unix
- __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
- _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
#ifdef unix
- } while( sched_yield(), 1 );
+ sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
#else
- } while( SwitchToThread(), 1 );
+ SwitchToThread();
#endif
+ }
}
// try to obtain write lock
// return 1 if obtained,
// 0 otherwise
-int bt_spinwritetry(BtSpinLatch *latch)
+int bt_mutextry(BtMutexLatch *latch)
{
-ushort prev;
+uint prev;
#ifdef unix
- prev = __sync_fetch_and_or((ushort *)latch, XCL);
+ prev = __sync_fetch_and_or(latch->exclusive, XCL);
#else
- prev = _InterlockedOr16((ushort *)latch, XCL);
+ prev = _InterlockedOr(latch->exclusive, XCL);
#endif
- // take write access if all bits are clear
+ // take write access if exclusive bit is clear
- if( !(prev & XCL) )
- if( !(prev & ~BOTH) )
- return 1;
- else
-#ifdef unix
- __sync_fetch_and_and ((ushort *)latch, ~XCL);
-#else
- _InterlockedAnd16((ushort *)latch, ~XCL);
-#endif
- return 0;
+ return !(prev & XCL);
}
// clear write mode
-void bt_spinreleasewrite(BtSpinLatch *latch)
+void bt_releasemutex(BtMutexLatch *latch)
{
+ *latch->exclusive = 0;
#ifdef unix
- __sync_fetch_and_and((ushort *)latch, ~BOTH);
-#else
- _InterlockedAnd16((ushort *)latch, ~BOTH);
+ sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
#endif
}
-// decrement reader count
+// recovery manager -- flush dirty pages
-void bt_spinreleaseread(BtSpinLatch *latch)
+void bt_flushlsn (BtDb *bt)
{
-#ifdef unix
- __sync_fetch_and_add((ushort *)latch, -SHARE);
-#else
- _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
-#endif
+uint cnt3 = 0, cnt2 = 0, cnt = 0;
+BtLatchSet *latch;
+BtPage page;
+uint entry;
+
+ // flush dirty pool pages to the btree that were
+ // dirty before the start of this redo recovery buffer
+fprintf(stderr, "Start flushlsn\n");
+ for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) {
+ page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
+ latch = bt->mgr->latchsets + entry;
+ bt_mutexlock (latch->modify);
+ bt_lockpage(bt, BtLockRead, latch);
+
+ if( latch->dirty ) {
+ bt_writepage(bt->mgr, page, latch->page_no);
+ latch->dirty = 0, bt->writes++, cnt++;
+ }
+if( latch->avail )
+cnt3++;
+if( latch->pin & ~CLOCK_bit )
+cnt2++;
+ bt_unlockpage(bt, BtLockRead, latch);
+ bt_releasemutex (latch->modify);
+ }
+fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3);
+}
+
+// recovery manager -- process current recovery buff on startup
+// this won't do much if previous session was properly closed.
+
+BTERR bt_recoveryredo (BtMgr *mgr)
+{
+BtLogHdr *hdr, *eof;
+uint offset = 0;
+BtKey *key;
+BtVal *val;
+
+ pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size);
+
+ hdr = (BtLogHdr *)mgr->redobuff;
+ mgr->flushlsn = hdr->lsn;
+
+ while( 1 ) {
+ hdr = (BtLogHdr *)(mgr->redobuff + offset);
+ switch( hdr->type ) {
+ case BTRM_eof:
+ mgr->lsn = hdr->lsn;
+ return 0;
+ case BTRM_add: // add a unique key-value to btree
+
+ case BTRM_dup: // add a duplicate key-value to btree
+ case BTRM_del: // delete a key-value from btree
+ case BTRM_upd: // update a key with a new value
+ case BTRM_new: // allocate a new empty page
+ case BTRM_old: // reuse an old empty page
+ return 0;
+ }
+ }
}
-// recovery manager -- dump current recovery buff & flush
+// recovery manager -- dump current recovery buff & flush dirty pages
+// in preparation for next recovery buffer.
BTERR bt_dumpredo (BtDb *bt)
{
BtLogHdr *eof;
+fprintf(stderr, "Flush pages ");
eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
memset (eof, 0, sizeof(BtLogHdr));
- pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
-
// flush pages written at beginning of this redo buffer
- // along with the redo buffer out to disk
+ // then write the redo buffer out to disk
fdatasync (bt->mgr->idx);
- bt->mgr->flushlsn = bt->mgr->pagezero->alloc->lsn;
+fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend);
+ pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits);
+
+ sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+
+ bt->mgr->flushlsn = bt->mgr->lsn;
bt->mgr->redoend = 0;
+
+ eof = (BtLogHdr *)(bt->mgr->redobuff);
+ memset (eof, 0, sizeof(BtLogHdr));
+ eof->lsn = bt->mgr->lsn;
return 0;
}
BtLogHdr *hdr, *eof;
uint flush;
- bt_spinwritelock (bt->mgr->redo);
+ bt_mutexlock (bt->mgr->redo);
if( key )
amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
// see if new entry fits in buffer
// flush and reset if it doesn't
- if( flush = amt > size - bt->mgr->redoend )
+ if( flush = amt > size - bt->mgr->redoend ) {
+ bt_mutexlock (bt->mgr->dump);
+
if( bt_dumpredo (bt) )
return 0;
+ }
// fill in new entry & either eof or end block
hdr->len = amt;
hdr->type = type;
hdr->lvl = lvl;
- hdr->lsn = ++bt->mgr->pagezero->alloc->lsn;
+ hdr->lsn = ++bt->mgr->lsn;
bt->mgr->redoend += amt;
memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
}
- bt_spinreleasewrite(bt->mgr->redo);
+ bt_releasemutex(bt->mgr->redo);
- if( flush )
+ if( flush ) {
bt_flushlsn (bt);
+ bt_releasemutex(bt->mgr->dump);
+ }
return hdr->lsn;
}
+// recovery manager -- append transaction to recovery log
+// flush to disk when it overflows.
+
+logseqno bt_txnredo (BtDb *bt, BtPage source)
+{
+uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr);
+uint amt = 0, src, type;
+BtLogHdr *hdr, *eof;
+logseqno lsn;
+uint flush;
+BtKey *key;
+BtVal *val;
+
+ // determine amount of redo recovery log space required
+
+ for( src = 0; src++ < source->cnt; ) {
+ key = keyptr(source,src);
+ val = valptr(source,src);
+ amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
+ amt += sizeof(BtLogHdr);
+ }
+
+ bt_mutexlock (bt->mgr->redo);
+
+ // see if new entry fits in buffer
+ // flush and reset if it doesn't
+
+ if( flush = amt > size - bt->mgr->redoend ) {
+ bt_mutexlock (bt->mgr->dump);
+
+ if( bt_dumpredo (bt) )
+ return 0;
+ }
+
+ // assign new lsn to transaction
+
+ lsn = ++bt->mgr->lsn;
+
+ // fill in new entries
+
+ for( src = 0; src++ < source->cnt; ) {
+ key = keyptr(source, src);
+ val = valptr(source, src);
+
+ switch( slotptr(source, src)->type ) {
+ case Unique:
+ type = BTRM_add;
+ break;
+ case Duplicate:
+ type = BTRM_dup;
+ break;
+ case Delete:
+ type = BTRM_del;
+ break;
+ case Update:
+ type = BTRM_upd;
+ break;
+ }
+
+ amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
+ amt += sizeof(BtLogHdr);
+
+ hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
+ hdr->len = amt;
+ hdr->type = type;
+ hdr->lsn = lsn;
+ hdr->lvl = 0;
+
+ // fill in key and value
+
+ memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
+ memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
+
+ bt->mgr->redoend += amt;
+ }
+
+ eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend);
+ memset (eof, 0, sizeof(BtLogHdr));
+
+ bt_releasemutex(bt->mgr->redo);
+
+ if( flush ) {
+ bt_flushlsn (bt);
+ bt_releasemutex(bt->mgr->dump);
+ }
+
+ return lsn;
+}
+
// read page into buffer pool from permanent location in Btree file
BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
#ifdef unix
if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
- fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
+ fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno);
return BTERR_read;
}
#else
ovl->OffsetHigh = off >> 32;
if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
- fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
+ fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError());
return BTERR_read;
}
if( *amt < mgr->page_size ) {
return 0;
}
-// link latch table entry into head of latch hash table
-
-BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
-{
-BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
-BtLatchSet *latch = bt->mgr->latchsets + slot;
-
- if( latch->next = bt->mgr->hashtable[hashidx].slot )
- bt->mgr->latchsets[latch->next].prev = slot;
-
- bt->mgr->hashtable[hashidx].slot = slot;
- latch->page_no = page_no;
- latch->entry = slot;
- latch->split = 0;
- latch->prev = 0;
- latch->pin = 1;
-
- if( loadit )
- if( bt->err = bt_readpage (bt->mgr, page, page_no) )
- return bt->err;
- else
- bt->reads++;
-
- return bt->err = 0;
-}
-
// set CLOCK bit in latch
// decrement pin count
void bt_unpinlatch (BtLatchSet *latch)
{
-#ifdef unix
- if( ~latch->pin & CLOCK_bit )
- __sync_fetch_and_or(&latch->pin, CLOCK_bit);
- __sync_fetch_and_add(&latch->pin, -1);
-#else
- if( ~latch->pin & CLOCK_bit )
- _InterlockedOr16 (&latch->pin, CLOCK_bit);
- _InterlockedDecrement16 (&latch->pin);
-#endif
+ bt_mutexlock(latch->modify);
+ latch->pin |= CLOCK_bit;
+ latch->pin--;
+ bt_releasemutex(latch->modify);
}
// return the btree cached page address
{
BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
- if( latch->dirty )
- if( page->lsn < bt->mgr->flushlsn )
- if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
- return NULL;
- else
- latch->dirty = 0, bt->writes++;
-
return page;
}
-// find existing latchset or inspire new one
-// return with latchset pinned
+// return next available latch entry
+// and with latch entry locked
-BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
+uint bt_availnext (BtDb *bt)
{
-uint hashidx = page_no % bt->mgr->latchhash;
BtLatchSet *latch;
-uint attempts = 0;
-uint slot, idx;
-uint lvl, cnt;
-off64_t off;
-uint amt[1];
-BtPage page;
+uint entry;
- // try to find our entry
+ while( 1 ) {
+#ifdef unix
+ entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1;
+#else
+ entry = _InterlockedIncrement (&bt->mgr->latchavail);
+#endif
+ entry %= bt->mgr->latchtotal;
- bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
+ if( !entry )
+ continue;
- if( slot = bt->mgr->hashtable[hashidx].slot ) do
- {
- latch = bt->mgr->latchsets + slot;
- if( page_no == latch->page_no )
- break;
- } while( slot = latch->next );
+ latch = bt->mgr->latchsets + entry;
- // found our entry
- // increment clock
+ if( !latch->avail )
+ continue;
- if( slot ) {
- latch = bt->mgr->latchsets + slot;
-#ifdef unix
- __sync_fetch_and_add(&latch->pin, 1);
-#else
- _InterlockedIncrement16 (&latch->pin);
-#endif
- bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
- return latch;
+ bt_mutexlock(latch->modify);
+
+ if( !latch->avail ) {
+ bt_releasemutex(latch->modify);
+ continue;
+ }
+
+ return entry;
}
+}
- // see if there are any unused pool entries
-#ifdef unix
- slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
-#else
- slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
-#endif
+// find and add the next available latch entry
+// to the queue
- if( slot < bt->mgr->latchtotal ) {
- latch = bt->mgr->latchsets + slot;
- if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
- return NULL;
- bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
- return latch;
- }
+BTERR bt_availlatch (BtDb *bt)
+{
+BtLatchSet *latch;
+uint startattempt;
+uint cnt, entry;
+uint hashidx;
+BtPage page;
-#ifdef unix
- __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
-#else
- _InterlockedDecrement (&bt->mgr->latchdeployed);
-#endif
// find and reuse previous entry on victim
+ startattempt = bt->mgr->latchvictim;
+
while( 1 ) {
#ifdef unix
- slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
+ entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
#else
- slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
+ entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
#endif
- // try to get write lock on hash chain
- // skip entry if not obtained
- // or has outstanding pins
+ // skip entry if it has outstanding pins
- slot %= bt->mgr->latchtotal;
+ entry %= bt->mgr->latchtotal;
- if( !slot )
+ if( !entry )
continue;
- // only go around two times before
+ // only go around one time before
// flushing redo recovery buffer,
- // and the buffer pool.
+ // and the buffer pool to free up entries.
if( bt->mgr->redopages )
- if( attempts++ > 2 * bt->mgr->latchtotal ) {
- if( bt_dumpredo (bt) )
- return NULL;
- bt_flushlsn (bt);
- attempts = 0;
+ if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) {
+ if( bt_mutextry (bt->mgr->dump) ) {
+ if( bt_dumpredo (bt) )
+ return bt->err;
+ bt_flushlsn (bt);
+ // synchronize the various threads running into this condition
+ // so that only one thread does the dump and flush
+ } else
+ bt_mutexlock(bt->mgr->dump);
+
+ startattempt = bt->mgr->latchvictim;
+ bt_releasemutex(bt->mgr->dump);
}
- latch = bt->mgr->latchsets + slot;
- idx = latch->page_no % bt->mgr->latchhash;
+ latch = bt->mgr->latchsets + entry;
- // see we are on same chain as hashidx
+ if( latch->avail )
+ continue;
- if( idx == hashidx )
- continue;
+ bt_mutexlock(latch->modify);
- if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
- continue;
+ // skip if already an available entry
+
+ if( latch->avail ) {
+ bt_releasemutex(latch->modify);
+ continue;
+ }
- // skip this slot if it is pinned
- // or the CLOCK bit is set
+ // skip this entry if it is pinned
+ // if the CLOCK bit is set
+ // reset it to zero.
if( latch->pin ) {
- if( latch->pin & CLOCK_bit ) {
-#ifdef unix
- __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
-#else
- _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
-#endif
- }
- bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
+ latch->pin &= ~CLOCK_bit;
+ bt_releasemutex(latch->modify);
continue;
}
- page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
+ page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
// if dirty page has lsn >= last redo recovery buffer
- // then hold page in the buffer pool until redo
- // recovery buffer is written to disk.
+ // then hold page in the buffer pool until next redo
+ // recovery buffer is being written to disk.
if( latch->dirty )
if( page->lsn >= bt->mgr->flushlsn ) {
- bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
+ bt_releasemutex(latch->modify);
continue;
}
-
- // update permanent page area in btree from buffer pool
- // no read-lock is required since page is not pinned.
- if( latch->dirty )
- if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
- return NULL;
- else
- latch->dirty = 0, bt->writes++;
-
- // unlink our available slot from its hash chain
+ // entry is available
+#ifdef unix
+ __sync_fetch_and_add (&bt->mgr->available, 1);
+#else
+ _InterlockedIncrement(&bt->mgr->available);
+#endif
+ latch->avail = 1;
+ bt_releasemutex(latch->modify);
+ return 0;
+ }
+}
- if( latch->prev )
- bt->mgr->latchsets[latch->prev].next = latch->next;
- else
- bt->mgr->hashtable[idx].slot = latch->next;
+// release available latch requests
- if( latch->next )
- bt->mgr->latchsets[latch->next].prev = latch->prev;
+void bt_availrelease (BtDb *bt, uint count)
+{
+#ifdef unix
+ __sync_fetch_and_add(bt->mgr->availlock, -count);
+#else
+ _InterlockedAdd(bt->mgr->availlock, -count);
+#endif
+}
- bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
+// commit available chain entries
+// find available entries as required
- if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
- return NULL;
+void bt_availrequest (BtDb *bt, uint count)
+{
+#ifdef unix
+ __sync_fetch_and_add(bt->mgr->availlock, count);
+#else
+ _InterlockedAdd(bt->mgr->availlock, count);
+#endif
- bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
- return latch;
- }
+ while( *bt->mgr->availlock > bt->mgr->available )
+ bt_availlatch (bt);
}
-// flush pages
+// find available latchset
+// return with latchset pinned
-void bt_flushlsn (BtDb *bt)
+BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit)
{
+uint hashidx = page_no % bt->mgr->latchhash;
BtLatchSet *latch;
-uint hashidx;
-uint num = 0;
+uint entry, idx;
BtPage page;
-uint slot;
- // flush dirty pool pages to the btree that were
- // dirty before the start of this redo recovery buffer
+ // try to find our entry
- for( slot = 1; slot <= bt->mgr->latchdeployed; slot++ ) {
- page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
- latch = bt->mgr->latchsets + slot;
-// hashidx = latch->page_no % bt->mgr->latchhash;
+ bt_mutexlock(bt->mgr->hashtable[hashidx].latch);
-// if( !bt_spinwritetry (bt->mgr->hashtable[hashidx].latch) )
-// continue;
+ if( entry = bt->mgr->hashtable[hashidx].entry ) do
+ {
+ latch = bt->mgr->latchsets + entry;
+ if( page_no == latch->page_no )
+ break;
+ } while( entry = latch->next );
- if( latch->dirty ) {
- bt_lockpage(bt, BtLockRead, latch);
- bt_writepage(bt->mgr, page, latch->page_no);
- latch->dirty = 0, bt->writes++;
- bt_unlockpage(bt, BtLockRead, latch);
- }
+ // found our entry: increment pin
+ // remove from available status
-// bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
+ if( entry ) {
+ latch = bt->mgr->latchsets + entry;
+ bt_mutexlock(latch->modify);
+ if( latch->avail )
+#ifdef unix
+ __sync_fetch_and_add (&bt->mgr->available, -1);
+#else
+ _InterlockedDecrement(&bt->mgr->available);
+#endif
+ latch->avail = 0;
+ latch->pin |= CLOCK_bit;
+ latch->pin++;
+
+ bt_releasemutex(latch->modify);
+ bt_releasemutex(bt->mgr->hashtable[hashidx].latch);
+ return latch;
+ }
+
+ // find and reuse entry from available set
+
+trynext:
+
+ if( entry = bt_availnext (bt) )
+ latch = bt->mgr->latchsets + entry;
+ else
+ return bt->line = __LINE__, bt->err = BTERR_avail, NULL;
+
+ idx = latch->page_no % bt->mgr->latchhash;
+
+ // if latch is on a different hash chain
+ // unlink from the old page_no chain
+
+ if( latch->page_no )
+ if( idx != hashidx ) {
+
+ // skip over this entry if latch not available
+
+ if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) {
+ bt_releasemutex(latch->modify);
+ goto trynext;
}
-}
+ if( latch->prev )
+ bt->mgr->latchsets[latch->prev].next = latch->next;
+ else
+ bt->mgr->hashtable[idx].entry = latch->next;
+
+ if( latch->next )
+ bt->mgr->latchsets[latch->next].prev = latch->prev;
+
+ bt_releasemutex (bt->mgr->hashtable[idx].latch);
+ }
+
+ // remove available status
+
+ latch->avail = 0;
+#ifdef unix
+ __sync_fetch_and_add (&bt->mgr->available, -1);
+#else
+ _InterlockedDecrement(&bt->mgr->available);
+#endif
+ page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool);
+
+ if( latch->dirty )
+ if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
+ return bt->line = __LINE__, NULL;
+ else
+ latch->dirty = 0, bt->writes++;
+
+ if( loadit ) {
+ memcpy (page, loadit, bt->mgr->page_size);
+ latch->dirty = 1;
+ } else
+ if( bt->err = bt_readpage (bt->mgr, page, page_no) )
+ return bt->line = __LINE__, NULL;
+ else
+ bt->reads++;
+
+ // link page as head of hash table chain
+ // if this is a never before used entry,
+ // or it was previously on a different
+ // hash table chain. Otherwise, just
+ // leave it in its current hash table
+ // chain position.
+
+ if( !latch->page_no || hashidx != idx ) {
+ if( latch->next = bt->mgr->hashtable[hashidx].entry )
+ bt->mgr->latchsets[latch->next].prev = entry;
+
+ bt->mgr->hashtable[hashidx].entry = entry;
+ latch->prev = 0;
+ }
+
+ // fill in latch structure
+
+ latch->pin = CLOCK_bit | 1;
+ latch->page_no = page_no;
+ latch->entry = entry;
+ latch->split = 0;
+
+ bt_releasemutex (latch->modify);
+ bt_releasemutex (bt->mgr->hashtable[hashidx].latch);
+ return latch;
+}
+
void bt_mgrclose (BtMgr *mgr)
{
BtLatchSet *latch;
BtPage page;
uint slot;
- // flush recovery buffer to disk
+ // flush previously written dirty pages
+ // and write recovery buffer to disk
+
+ fdatasync (mgr->idx);
if( mgr->redoend ) {
eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits);
}
- // flush dirty pool pages to the btree
+ // write remaining dirty pool pages to the btree
- for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
+ for( slot = 1; slot < mgr->latchtotal; slot++ ) {
page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
latch = mgr->latchsets + slot;
}
}
+ // flush last batch to disk and clear
+ // redo recovery buffer on disk.
+
+ fdatasync (mgr->idx);
+
+ eof = (BtLogHdr *)mgr->redobuff;
+ memset (eof, 0, sizeof(BtLogHdr));
+ eof->lsn = mgr->lsn;
+
+ pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
+
+ sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+
fprintf(stderr, "%d buffer pool pages flushed\n", num);
#ifdef unix
- munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
+ munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
munmap (mgr->pagezero, mgr->page_size);
#else
FlushViewOfFile(mgr->pagezero, 0);
UnmapViewOfFile(mgr->pagezero);
- UnmapViewOfFile(mgr->hashtable);
+ UnmapViewOfFile(mgr->pagepool);
CloseHandle(mgr->halloc);
CloseHandle(mgr->hpool);
#endif
unsigned char value[BtId];
int flag, initit = 0;
BtPageZero *pagezero;
+BtLatchSet *latch;
off64_t size;
uint amt[1];
BtMgr* mgr;
// calculate number of latch hash table entries
- mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
- mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
+ mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
mgr->nlatchpage += nodemax; // size of the buffer pool in pages
- mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
+ mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size;
mgr->latchtotal = nodemax;
if( !initit )
}
mlock (mgr->pagezero, mgr->page_size);
- mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
- if( mgr->hashtable == MAP_FAILED ) {
+ mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
+ if( mgr->pagepool == MAP_FAILED ) {
fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
return bt_mgrclose (mgr), NULL;
}
}
flag = FILE_MAP_WRITE;
- mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
- if( !mgr->hashtable ) {
+ mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size);
+ if( !mgr->pagepool ) {
fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
return bt_mgrclose (mgr), NULL;
}
mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE);
#endif
- mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
- mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
+ mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
+ mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
+ mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
+
+ // mark all pool entries as available
+
+ for( idx = 1; idx < mgr->latchtotal; idx++ ) {
+ latch = mgr->latchsets + idx;
+ latch->avail = 1;
+ mgr->available++;
+ }
return mgr;
}
// lock allocation page
- bt_spinwritelock(bt->mgr->lock);
+ bt_mutexlock(bt->mgr->lock);
// use empty chain first
// else allocate empty page
if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
- if( set->latch = bt_pinlatch (bt, page_no, 1) )
+ if( set->latch = bt_pinlatch (bt, page_no, NULL) )
set->page = bt_mappage (bt, set->latch);
else
- return bt->err = BTERR_struct, -1;
+ return bt->err = BTERR_struct, bt->line = __LINE__, -1;
bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
- bt_spinreleasewrite(bt->mgr->lock);
+ bt_releasemutex(bt->mgr->lock);
memcpy (set->page, contents, bt->mgr->page_size);
set->latch->dirty = 1;
page_no = bt_getid(bt->mgr->pagezero->alloc->right);
bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
- // unlock allocation latch
+ // unlock allocation latch and
+ // extend file into new page.
- bt_spinreleasewrite(bt->mgr->lock);
+ bt_releasemutex(bt->mgr->lock);
// don't load cache from btree page
- if( set->latch = bt_pinlatch (bt, page_no, 0) )
+ if( set->latch = bt_pinlatch (bt, page_no, contents) )
set->page = bt_mappage (bt, set->latch);
else
- return bt->err = BTERR_struct;
+ return bt->err;
- memcpy (set->page, contents, bt->mgr->page_size);
set->latch->dirty = 1;
return 0;
}
uint drill = 0xff, slot;
BtLatchSet *prevlatch;
uint mode, prevmode;
+BtVal *val;
// start at root of btree and drill down
// determine lock mode of drill level
mode = (drill == lvl) ? lock : BtLockRead;
- if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
+ if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) )
return 0;
// obtain access lock using lock chaining with Access mode
bt_lockpage(bt, mode, set->latch);
if( set->page->free )
- return bt->err = BTERR_struct, 0;
+ return bt->err = BTERR_struct, bt->line = __LINE__, 0;
if( page_no > ROOT_page )
bt_unlockpage(bt, BtLockAccess, set->latch);
if( set->page->lvl != drill) {
if( set->latch->page_no != ROOT_page )
- return bt->err = BTERR_struct, 0;
+ return bt->err = BTERR_struct, bt->line = __LINE__, 0;
drill = set->page->lvl;
if( slot++ < set->page->cnt )
continue;
else
- return bt->err = BTERR_struct, 0;
+ return bt->err = BTERR_struct, bt->line = __LINE__, 0;
+
+ val = valptr(set->page, slot);
+
+ if( val->len == BtId )
+ page_no = bt_getid(valptr(set->page, slot)->value);
+ else
+ return bt->line = __LINE__, bt->err = BTERR_struct, 0;
- page_no = bt_getid(valptr(set->page, slot)->value);
drill--;
continue;
}
- // or slide right into next page
+ // slide right into next page
page_no = bt_getid(set->page->right);
} while( page_no );
// return error on end of right chain
- bt->err = BTERR_struct;
+ bt->line = __LINE__, bt->err = BTERR_struct;
return 0; // return error
}
{
// lock allocation page
- bt_spinwritelock (bt->mgr->lock);
+ bt_mutexlock (bt->mgr->lock);
// store chain
// unlock allocation page
- bt_spinreleasewrite (bt->mgr->lock);
+ bt_releasemutex (bt->mgr->lock);
}
// a fence key was deleted from a page
{
BtPageSet child[1];
uid page_no;
+BtVal *val;
uint idx;
// find the child entry and promote as new root contents
if( !slotptr(root->page, idx)->dead )
break;
- page_no = bt_getid (valptr(root->page, idx)->value);
+ val = valptr(root->page, idx);
- if( child->latch = bt_pinlatch (bt, page_no, 1) )
+ if( val->len == BtId )
+ page_no = bt_getid (valptr(root->page, idx)->value);
+ else
+ return bt->line = __LINE__, bt->err = BTERR_struct;
+
+ if( child->latch = bt_pinlatch (bt, page_no, NULL) )
child->page = bt_mappage (bt, child->latch);
else
return bt->err;
page_no = bt_getid(set->page->right);
- if( right->latch = bt_pinlatch (bt, page_no, 1) )
+ if( right->latch = bt_pinlatch (bt, page_no, NULL) )
right->page = bt_mappage (bt, right->latch);
else
return 0;
memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
if( right->page->kill )
- return bt->err = BTERR_struct;
+ return bt->line = __LINE__, bt->err = BTERR_struct;
// pull contents of right peer into our empty page
prevlatch = set->latch;
if( page_no = bt_getid(set->page->right) )
- if( set->latch = bt_pinlatch (bt, page_no, 1) )
+ if( set->latch = bt_pinlatch (bt, page_no, NULL) )
set->page = bt_mappage (bt, set->latch);
else
return 0;
else
- return bt->err = BTERR_struct, 0;
+ return bt->err = BTERR_struct, bt->line = __LINE__, 0;
// obtain access lock using lock chaining with Access mode
memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
set->latch->dirty = 1;
+
page->garbage = 0;
page->act = 0;
if( bt_newpage(bt, right, bt->frame) )
return 0;
+ // process lower keys
+
memcpy (bt->frame, set->page, bt->mgr->page_size);
memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
set->latch->dirty = 1;
ptr = keyptr(set->page, slot);
else {
if( !bt->err )
- bt->err = BTERR_ovflw;
+ bt->line = __LINE__, bt->err = BTERR_ovflw;
return bt->err;
}
}
typedef struct {
+ logseqno reqlsn; // redo log seq no required
logseqno lsn; // redo log sequence number
uint entry; // latch table entry number
uint slot:31; // page slot number
}
} while( entry = set->latch->split );
- bt->err = BTERR_atomic;
+ bt->line = __LINE__, bt->err = BTERR_atomic;
return 0;
}
locks[src].slot = 0;
}
- return bt->err = BTERR_atomic;
+ return bt->line = __LINE__, bt->err = BTERR_atomic;
}
BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
{
BtKey *key = keyptr(source, src);
-uint idx, entry, slot;
BtPageSet set[1];
+uint idx, slot;
BtKey *ptr;
BtVal *val;
if( slot = bt_atomicpage (bt, source, locks, src, set) )
ptr = keyptr(set->page, slot);
else
- return bt->err = BTERR_struct;
+ return bt->line = __LINE__, bt->err = BTERR_struct;
if( !keycmp (ptr, key->key, key->len) )
if( !slotptr(set->page, slot)->dead )
val = valptr(set->page, slot);
set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
- set->page->lsn = locks[src].lsn;
set->latch->dirty = 1;
+ set->page->lsn = locks[src].lsn;
set->page->act--;
bt->found++;
return 0;
// grab the right sibling
- if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+ if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) )
right->page = bt_mappage (bt, right->latch);
else
return bt->err;
// to remove scanner's poiner to the right page
if( right_page_no = bt_getid (prev->page->right) ) {
- if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
+ if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) )
temp->page = bt_mappage (bt, temp->latch);
bt_lockpage (bt, BtLockWrite, temp->latch);
bt_unlockpage (bt, BtLockWrite, temp->latch);
bt_unpinlatch (temp->latch);
} else { // master is now the far right page
- bt_spinwritelock (bt->mgr->lock);
+ bt_mutexlock (bt->mgr->lock);
bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
- bt_spinreleasewrite(bt->mgr->lock);
+ bt_releasemutex(bt->mgr->lock);
}
// now that there are no pointers to the right page
int bt_atomictxn (BtDb *bt, BtPage source)
{
-uint src, idx, slot, samepage, entry;
+uint src, idx, slot, samepage, entry, avail, que = 0;
AtomicKey *head, *tail, *leaf;
BtPageSet set[1], prev[1];
unsigned char value[BtId];
AtomicTxn *locks;
int result = 0;
BtSlot temp[1];
+logseqno lsn;
BtPage page;
BtVal *val;
uid right;
}
}
+ // reserve enough buffer pool entries
+
+ avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1;
+ bt_availrequest (bt, avail);
+
// Load the leaf page for each key
// group same page references with reuse bit
// and determine any constraint violations
locks[src].reuse = 1;
}
+ // capture current lsn for master page
+
+ locks[src].reqlsn = set->page->lsn;
+
+ // perform constraint checks
+
switch( slotptr(source, src)->type ) {
case Duplicate:
case Unique:
// and add entries to redo log
- for( src = 0; src++ < source->cnt; ) {
- key = keyptr(source, src);
- val = valptr(source, src);
- switch( slotptr(source, src)->type ) {
- case Unique:
- type = BTRM_add;
- break;
- case Duplicate:
- type = BTRM_dup;
- break;
- case Delete:
- type = BTRM_del;
- break;
- case Update:
- type = BTRM_upd;
- break;
- }
-
- if( locks[src].lsn = bt_newredo (bt, type, 0, key, val) )
- continue;
-
- goto atomicerr;
- }
+ if( bt->mgr->redopages )
+ if( lsn = bt_txnredo (bt, source) )
+ for( src = 0; src++ < source->cnt; )
+ locks[src].lsn = lsn;
+ else
+ goto atomicerr;
// obtain write lock for each master page
// schedule prev fence key update
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = calloc (sizeof(AtomicKey), 1);
+ leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
// far right sibling or set rightmost page in page zero
if( right = bt_getid (prev->page->right) ) {
- if( set->latch = bt_pinlatch (bt, right, 1) )
+ if( set->latch = bt_pinlatch (bt, right, NULL) )
set->page = bt_mappage (bt, set->latch);
else
goto atomicerr;
bt_unlockpage (bt, BtLockWrite, set->latch);
bt_unpinlatch (set->latch);
} else { // prev is rightmost page
- bt_spinwritelock (bt->mgr->lock);
+ bt_mutexlock (bt->mgr->lock);
bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
- bt_spinreleasewrite(bt->mgr->lock);
+ bt_releasemutex(bt->mgr->lock);
}
// Process last page split in chain
ptr = keyptr(prev->page,prev->page->cnt);
- leaf = calloc (sizeof(AtomicKey), 1);
+ leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
// perform the remainder of the delete
// from the FIFO queue
- leaf = calloc (sizeof(AtomicKey), 1);
+ leaf = calloc (sizeof(AtomicKey), 1), que++;
memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
leaf->page_no = prev->latch->page_no;
bt_unlockpage(bt, BtLockWrite, prev->latch);
}
+ bt_availrelease (bt, avail);
+
+ que *= bt->mgr->pagezero->alloc->lvl;
+ bt_availrequest (bt, que);
+
// add & delete keys for any pages split or merged during transaction
if( leaf = head )
free (leaf);
} while( leaf = tail );
+ bt_availrelease (bt, que);
+
// return success
free (locks);
uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
BtPageSet set[1];
- if( set->latch = bt_pinlatch (bt, page_no, 1) )
+ if( set->latch = bt_pinlatch (bt, page_no, NULL) )
set->page = bt_mappage (bt, set->latch);
else
return 0;
findourself:
bt->cursor_page = next;
- if( set->latch = bt_pinlatch (bt, next, 1) )
+ if( set->latch = bt_pinlatch (bt, next, NULL) )
set->page = bt_mappage (bt, set->latch);
else
return 0;
bt->cursor_page = right;
- if( set->latch = bt_pinlatch (bt, right, 1) )
+ if( set->latch = bt_pinlatch (bt, right, NULL) )
set->page = bt_mappage (bt, set->latch);
else
return 0;
void bt_poolaudit (BtMgr *mgr)
{
BtLatchSet *latch;
-uint slot = 0;
-
- while( slot++ < mgr->latchdeployed ) {
- latch = mgr->latchsets + slot;
-
- if( *latch->readwr->rin & MASK )
- fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
- memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
-
- if( *latch->access->rin & MASK )
- fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
- memset ((ushort *)latch->access, 0, sizeof(RWLock));
-
- if( *latch->parent->ticket != *latch->parent->serving )
- fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
- memset ((ushort *)latch->parent, 0, sizeof(RWLock));
-
- if( latch->pin & ~CLOCK_bit ) {
- fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
- latch->pin = 0;
- }
- }
-}
-
-uint bt_latchaudit (BtDb *bt)
-{
-ushort idx, hashidx;
-uid next, page_no;
-BtLatchSet *latch;
-uint cnt = 0;
-BtKey *ptr;
+uint entry = 0;
- if( *(ushort *)(bt->mgr->lock) )
- fprintf(stderr, "Alloc page locked\n");
- *(ushort *)(bt->mgr->lock) = 0;
+ while( ++entry < mgr->latchtotal ) {
+ latch = mgr->latchsets + entry;
- for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
- latch = bt->mgr->latchsets + idx;
if( *latch->readwr->rin & MASK )
- fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
- memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
+ fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no);
if( *latch->access->rin & MASK )
- fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
- memset ((ushort *)latch->access, 0, sizeof(RWLock));
-
- if( *latch->parent->ticket != *latch->parent->serving )
- fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
- memset ((ushort *)latch->parent, 0, sizeof(RWLock));
-
- if( latch->pin ) {
- fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
- latch->pin = 0;
- }
- }
-
- for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
- if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
- fprintf(stderr, "hash entry %d locked\n", hashidx);
-
- *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
-
- if( idx = bt->mgr->hashtable[hashidx].slot ) do {
- latch = bt->mgr->latchsets + idx;
- if( latch->pin )
- fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
- } while( idx = latch->next );
- }
-
- page_no = LEAF_page;
+ fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
- while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
- uid off = page_no << bt->mgr->page_bits;
-#ifdef unix
- pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
-#else
- DWORD amt[1];
+ if( *latch->parent->exclusive )
+ fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
- SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
+ if( *latch->atomic->exclusive )
+ fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
- if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
- return bt->err = BTERR_map;
+ if( *latch->modify->exclusive )
+ fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
- if( *amt < bt->mgr->page_size )
- return bt->err = BTERR_map;
-#endif
- if( !bt->frame->free && !bt->frame->lvl )
- cnt += bt->frame->act;
- page_no++;
+ if( latch->pin & ~CLOCK_bit )
+ fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no);
}
-
- cnt--; // remove stopper key
- fprintf(stderr, " Total keys read %d\n", cnt);
-
- bt_close (bt);
- return 0;
}
typedef struct {
switch(ch | 0x20)
{
- case 'a':
- fprintf(stderr, "started latch mgr audit\n");
- cnt = bt_latchaudit (bt);
- fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
- break;
-
case 'd':
type = Delete;
if( !args->num ) {
if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
- fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+ fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
len = 0;
continue;
}
page->min = nxt;
if( bt_atomictxn (bt, page) )
- fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+ fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
nxt = sizeof(txn);
cnt = 0;
line++;
if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
- fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+ fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0);
len = 0;
}
else if( len < BT_maxkey )
if( bt_findkey (bt, key, len, NULL, 0) == 0 )
found++;
else if( bt->err )
- fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
+ fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0);
len = 0;
}
else if( len < BT_maxkey )
case 's':
fprintf(stderr, "started scanning\n");
+
do {
- if( set->latch = bt_pinlatch (bt, page_no, 1) )
+ if( set->latch = bt_pinlatch (bt, page_no, NULL) )
set->page = bt_mappage (bt, set->latch);
else
fprintf(stderr, "unable to obtain latch"), exit(1);
cnt++;
}
+ set->latch->avail = 1;
bt_unlockpage (bt, BtLockRead, set->latch);
bt_unpinlatch (set->latch);
} while( page_no = next );