From 4be215184b59e4f8dfebc0fa25a7a83dfcfed39c Mon Sep 17 00:00:00 2001 From: unknown Date: Wed, 8 Oct 2014 12:52:16 -0700 Subject: [PATCH] Threadskv9c.c version for durability progress made --- threadskv9.c => threadskv9c.c | 1138 +++++++++++++++++++-------------- 1 file changed, 664 insertions(+), 474 deletions(-) rename threadskv9.c => threadskv9c.c (80%) diff --git a/threadskv9.c b/threadskv9c.c similarity index 80% rename from threadskv9.c rename to threadskv9c.c index 1894d1f..6029526 100644 --- a/threadskv9.c +++ b/threadskv9c.c @@ -1,4 +1,4 @@ -// btree version threadskv9 sched_yield version +// btree version threadskv9c FUTEX version // with reworked bt_deletekey code, // phase-fair reader writer lock, // librarian page split code, @@ -8,7 +8,7 @@ // ACID batched key-value updates // and redo log for failure recovery -// 01 OCT 2014 +// 07 OCT 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -34,6 +34,8 @@ REDISTRIBUTION OF THIS SOFTWARE. #ifdef linux #define _GNU_SOURCE +#include +#define SYS_futex 202 #endif #ifdef unix @@ -45,6 +47,7 @@ REDISTRIBUTION OF THIS SOFTWARE. #include #include #include +#include #else #define WIN32_LEAN_AND_MEAN #include @@ -117,11 +120,10 @@ typedef struct { ushort dup; } RWLock; -// write only queue lock +// write only lock typedef struct { - volatile ushort ticket[1]; - volatile ushort serving[1]; + volatile uint exclusive[1]; ushort tid; ushort dup; } WOLock; @@ -131,44 +133,46 @@ typedef struct { #define MASK 0x3 #define RINC 0x4 -// definition for spin latch implementation +// lite weight mutex // exclusive is set for write access -// share is count of read accessors -// grant write lock when share == 0 -volatile typedef struct { - ushort exclusive:1; - ushort pending:1; - ushort share:14; -} BtSpinLatch; +typedef struct { + volatile uint exclusive[1]; +} BtMutexLatch; #define XCL 1 -#define PEND 2 -#define BOTH 3 -#define SHARE 4 + +// mode & definition for lite latch implementation + +enum { + QueRd = 1, // reader queue + QueWr = 2 // writer queue +} RWQueue; // hash table entries typedef struct { - volatile uint slot; // Latch table entry at head of chain - BtSpinLatch latch[1]; + volatile uint entry; // Latch table entry at head of chain + BtMutexLatch latch[1]; } BtHashEntry; // latch manager table structure typedef struct { - uid page_no; // latch set page number - RWLock readwr[1]; // read/write page lock - RWLock access[1]; // Access Intent/Page delete - WOLock parent[1]; // Posting of fence key in parent - WOLock atomic[1]; // Atomic update in progress - uint split; // right split page atomic insert - uint entry; // entry slot in latch table - uint next; // next entry in hash table chain - uint prev; // prev entry in hash table chain - volatile ushort pin; // number of outstanding threads - ushort dirty:1; // page in cache is dirty + uid page_no; // latch set page number + RWLock readwr[1]; // read/write page lock + RWLock access[1]; // Access Intent/Page delete + WOLock parent[1]; // Posting of fence key in parent + WOLock atomic[1]; // Atomic update in progress + uint split; // right split page atomic insert + uint entry; // entry slot in latch table + uint next; // next entry in hash table chain + uint prev; // prev entry in hash table chain + BtMutexLatch modify[1]; // modify entry lite latch + volatile ushort pin; // number of accessing threads + volatile unsigned char dirty; // page in cache is dirty (atomic set) + volatile unsigned char avail; // page is an available entry } BtLatchSet; // Define the length of the page record numbers @@ -248,7 +252,7 @@ typedef struct BtPage_ { unsigned char right[BtId]; // page number to right unsigned char left[BtId]; // page number to left unsigned char filler[2]; // padding to multiple of 8 - logseqno lsn; // last LogSeqNo applied to page + logseqno lsn; // log sequence number applied } *BtPage; // The loadpage interface object @@ -281,15 +285,18 @@ typedef struct { BtLatchSet *latchsets; // mapped latch set from buffer pool unsigned char *pagepool; // mapped to the buffer pool pages unsigned char *redobuff; // mapped recovery buffer pointer - logseqno flushlsn; // first lsn flushed w/msync - BtSpinLatch redo[1]; // redo area lite latch - BtSpinLatch lock[1]; // allocation area lite latch + logseqno lsn, flushlsn; // current & first lsn flushed + BtMutexLatch dump[1]; // redo dump lite latch + BtMutexLatch redo[1]; // redo area lite latch + BtMutexLatch lock[1]; // allocation area lite latch ushort thread_no[1]; // next thread number - uint latchdeployed; // highest number of latch entries deployed uint nlatchpage; // number of latch pages at BT_latch uint latchtotal; // number of page latch entries uint latchhash; // number of latch hash table slots uint latchvictim; // next latch entry to examine + uint latchavail; // next available latch entry + uint availlock[1]; // latch available chain commitments + uint available; // size of latch available chain uint redopages; // size of recovery buff in pages uint redoend; // eof/end element in recovery buff #ifndef unix @@ -307,6 +314,7 @@ typedef struct { unsigned char key[BT_keyarray]; // last found complete key int found; // last delete or insert was found int err; // last error + int line; // last error line no int reads, writes; // number of reads and writes from the btree ushort thread_no; // thread number } BtDb; @@ -322,7 +330,8 @@ typedef enum { BTERR_read, BTERR_wrt, BTERR_atomic, - BTERR_recovery + BTERR_recovery, + BTERR_avail } BTERR; #define CLOCK_bit 0x8000 @@ -336,14 +345,14 @@ typedef enum { BTRM_del, // delete a key-value from btree BTRM_upd, // update a key with a new value BTRM_new, // allocate a new empty page - BTRM_old, // reuse an old empty page - BTRM_end = 255 // circular buffer inter-gap + BTRM_old // reuse an old empty page } BTRM; // recovery manager entry // structure followed by BtKey & BtVal typedef struct { + logseqno reqlsn; // log sequence number required logseqno lsn; // log sequence number for entry uint len; // length of entry unsigned char type; // type of entry @@ -354,7 +363,8 @@ typedef struct { extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr); -extern void bt_flushlsn (BtDb *bt); +extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no); +extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no); extern void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch); extern void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch); extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update); @@ -368,6 +378,7 @@ extern uint bt_nextkey (BtDb *bt, uint slot); extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize, uint rmpages); extern void bt_mgrclose (BtMgr *mgr); extern logseqno bt_newredo (BtDb *bt, BTRM type, int lvl, BtKey *key, BtVal *val); +extern logseqno bt_txnredo (BtDb *bt, BtPage page); // Helper functions to return slot values // from the cursor page. @@ -457,26 +468,29 @@ uid bt_newdup (BtDb *bt) void WriteOLock (WOLock *lock, ushort tid) { -ushort tix; +uint prev; if( lock->tid == tid ) { lock->dup++; return; } + + while( 1 ) { #ifdef unix - tix = __sync_fetch_and_add (lock->ticket, 1); + prev = __sync_fetch_and_or (lock->exclusive, 1); #else - tix = _InterlockedExchangeAdd16 (lock->ticket, 1); + prev = _InterlockedExchangeOr (lock->exclusive, 1); #endif - // wait for our ticket to come up - - while( tix != lock->serving[0] ) + if( !(prev & XCL) ) { + lock->tid = tid; + return; + } #ifdef unix - sched_yield(); + sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr ); #else - SwitchToThread (); + SwitchToThread (); #endif - lock->tid = tid; + } } void WriteORelease (WOLock *lock) @@ -486,8 +500,11 @@ void WriteORelease (WOLock *lock) return; } + *lock->exclusive = 0; lock->tid = 0; - lock->serving[0]++; +#ifdef linux + sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); +#endif } // Phase-Fair reader/writer lock implementation @@ -545,6 +562,37 @@ void WriteRelease (RWLock *lock) lock->serving[0]++; } +// try to obtain read lock +// return 1 if successful + +int ReadTry (RWLock *lock, ushort tid) +{ +ushort w; + + // OK if write lock already held by same thread + + if( lock->tid == tid ) { + lock->dup++; + return 1; + } +#ifdef unix + w = __sync_fetch_and_add (lock->rin, RINC) & MASK; +#else + w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK; +#endif + if( w ) + if( w == (*lock->rin & MASK) ) { +#ifdef unix + __sync_fetch_and_add (lock->rin, -RINC); +#else + _InterlockedExchangeAdd16 (lock->rin, -RINC); +#endif + return 0; + } + + return 1; +} + void ReadLock (RWLock *lock, ushort tid) { ushort w; @@ -580,63 +628,31 @@ void ReadRelease (RWLock *lock) #endif } -// Spin Latch Manager - -// wait until write lock mode is clear -// and add 1 to the share count +// lite weight spin lock Latch Manager -void bt_spinreadlock(BtSpinLatch *latch) +int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) { -ushort prev; - - do { -#ifdef unix - prev = __sync_fetch_and_add ((ushort *)latch, SHARE); -#else - prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE); -#endif - // see if exclusive request is granted or pending - - if( !(prev & BOTH) ) - return; -#ifdef unix - prev = __sync_fetch_and_add ((ushort *)latch, -SHARE); -#else - prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE); -#endif -#ifdef unix - } while( sched_yield(), 1 ); -#else - } while( SwitchToThread(), 1 ); -#endif + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); } -// wait for other read and write latches to relinquish - -void bt_spinwritelock(BtSpinLatch *latch) +void bt_mutexlock(BtMutexLatch *latch) { -ushort prev; +uint prev; - do { + while( 1 ) { #ifdef unix - prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL); + prev = __sync_fetch_and_or(latch->exclusive, XCL); #else - prev = _InterlockedOr16((ushort *)latch, PEND | XCL); + prev = _InterlockedOr(latch->exclusive, XCL); #endif if( !(prev & XCL) ) - if( !(prev & ~BOTH) ) return; - else -#ifdef unix - __sync_fetch_and_and ((ushort *)latch, ~XCL); -#else - _InterlockedAnd16((ushort *)latch, ~XCL); -#endif #ifdef unix - } while( sched_yield(), 1 ); + sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr ); #else - } while( SwitchToThread(), 1 ); + SwitchToThread(); #endif + } } // try to obtain write lock @@ -644,69 +660,122 @@ ushort prev; // return 1 if obtained, // 0 otherwise -int bt_spinwritetry(BtSpinLatch *latch) +int bt_mutextry(BtMutexLatch *latch) { -ushort prev; +uint prev; #ifdef unix - prev = __sync_fetch_and_or((ushort *)latch, XCL); + prev = __sync_fetch_and_or(latch->exclusive, XCL); #else - prev = _InterlockedOr16((ushort *)latch, XCL); + prev = _InterlockedOr(latch->exclusive, XCL); #endif - // take write access if all bits are clear + // take write access if exclusive bit is clear - if( !(prev & XCL) ) - if( !(prev & ~BOTH) ) - return 1; - else -#ifdef unix - __sync_fetch_and_and ((ushort *)latch, ~XCL); -#else - _InterlockedAnd16((ushort *)latch, ~XCL); -#endif - return 0; + return !(prev & XCL); } // clear write mode -void bt_spinreleasewrite(BtSpinLatch *latch) +void bt_releasemutex(BtMutexLatch *latch) { + *latch->exclusive = 0; #ifdef unix - __sync_fetch_and_and((ushort *)latch, ~BOTH); -#else - _InterlockedAnd16((ushort *)latch, ~BOTH); + sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); #endif } -// decrement reader count +// recovery manager -- flush dirty pages -void bt_spinreleaseread(BtSpinLatch *latch) +void bt_flushlsn (BtDb *bt) { -#ifdef unix - __sync_fetch_and_add((ushort *)latch, -SHARE); -#else - _InterlockedExchangeAdd16((ushort *)latch, -SHARE); -#endif +uint cnt3 = 0, cnt2 = 0, cnt = 0; +BtLatchSet *latch; +BtPage page; +uint entry; + + // flush dirty pool pages to the btree that were + // dirty before the start of this redo recovery buffer +fprintf(stderr, "Start flushlsn\n"); + for( entry = 1; entry < bt->mgr->latchtotal; entry++ ) { + page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool); + latch = bt->mgr->latchsets + entry; + bt_mutexlock (latch->modify); + bt_lockpage(bt, BtLockRead, latch); + + if( latch->dirty ) { + bt_writepage(bt->mgr, page, latch->page_no); + latch->dirty = 0, bt->writes++, cnt++; + } +if( latch->avail ) +cnt3++; +if( latch->pin & ~CLOCK_bit ) +cnt2++; + bt_unlockpage(bt, BtLockRead, latch); + bt_releasemutex (latch->modify); + } +fprintf(stderr, "End flushlsn %d pages %d pinned %d available\n", cnt, cnt2, cnt3); +} + +// recovery manager -- process current recovery buff on startup +// this won't do much if previous session was properly closed. + +BTERR bt_recoveryredo (BtMgr *mgr) +{ +BtLogHdr *hdr, *eof; +uint offset = 0; +BtKey *key; +BtVal *val; + + pread (mgr->idx, mgr->redobuff, mgr->redopages << mgr->page_size, REDO_page << mgr->page_size); + + hdr = (BtLogHdr *)mgr->redobuff; + mgr->flushlsn = hdr->lsn; + + while( 1 ) { + hdr = (BtLogHdr *)(mgr->redobuff + offset); + switch( hdr->type ) { + case BTRM_eof: + mgr->lsn = hdr->lsn; + return 0; + case BTRM_add: // add a unique key-value to btree + + case BTRM_dup: // add a duplicate key-value to btree + case BTRM_del: // delete a key-value from btree + case BTRM_upd: // update a key with a new value + case BTRM_new: // allocate a new empty page + case BTRM_old: // reuse an old empty page + return 0; + } + } } -// recovery manager -- dump current recovery buff & flush +// recovery manager -- dump current recovery buff & flush dirty pages +// in preparation for next recovery buffer. BTERR bt_dumpredo (BtDb *bt) { BtLogHdr *eof; +fprintf(stderr, "Flush pages "); eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend); memset (eof, 0, sizeof(BtLogHdr)); - pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits); - // flush pages written at beginning of this redo buffer - // along with the redo buffer out to disk + // then write the redo buffer out to disk fdatasync (bt->mgr->idx); - bt->mgr->flushlsn = bt->mgr->pagezero->alloc->lsn; +fprintf(stderr, "Dump ReDo: %d bytes\n", bt->mgr->redoend); + pwrite (bt->mgr->idx, bt->mgr->redobuff, bt->mgr->redoend + sizeof(BtLogHdr), REDO_page << bt->mgr->page_bits); + + sync_file_range (bt->mgr->idx, REDO_page << bt->mgr->page_bits, bt->mgr->redoend + sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); + + bt->mgr->flushlsn = bt->mgr->lsn; bt->mgr->redoend = 0; + + eof = (BtLogHdr *)(bt->mgr->redobuff); + memset (eof, 0, sizeof(BtLogHdr)); + eof->lsn = bt->mgr->lsn; return 0; } @@ -720,7 +789,7 @@ uint amt = sizeof(BtLogHdr); BtLogHdr *hdr, *eof; uint flush; - bt_spinwritelock (bt->mgr->redo); + bt_mutexlock (bt->mgr->redo); if( key ) amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal); @@ -728,9 +797,12 @@ uint flush; // see if new entry fits in buffer // flush and reset if it doesn't - if( flush = amt > size - bt->mgr->redoend ) + if( flush = amt > size - bt->mgr->redoend ) { + bt_mutexlock (bt->mgr->dump); + if( bt_dumpredo (bt) ) return 0; + } // fill in new entry & either eof or end block @@ -739,7 +811,7 @@ uint flush; hdr->len = amt; hdr->type = type; hdr->lvl = lvl; - hdr->lsn = ++bt->mgr->pagezero->alloc->lsn; + hdr->lsn = ++bt->mgr->lsn; bt->mgr->redoend += amt; @@ -753,14 +825,105 @@ uint flush; memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal)); } - bt_spinreleasewrite(bt->mgr->redo); + bt_releasemutex(bt->mgr->redo); - if( flush ) + if( flush ) { bt_flushlsn (bt); + bt_releasemutex(bt->mgr->dump); + } return hdr->lsn; } +// recovery manager -- append transaction to recovery log +// flush to disk when it overflows. + +logseqno bt_txnredo (BtDb *bt, BtPage source) +{ +uint size = bt->mgr->page_size * bt->mgr->redopages - sizeof(BtLogHdr); +uint amt = 0, src, type; +BtLogHdr *hdr, *eof; +logseqno lsn; +uint flush; +BtKey *key; +BtVal *val; + + // determine amount of redo recovery log space required + + for( src = 0; src++ < source->cnt; ) { + key = keyptr(source,src); + val = valptr(source,src); + amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal); + amt += sizeof(BtLogHdr); + } + + bt_mutexlock (bt->mgr->redo); + + // see if new entry fits in buffer + // flush and reset if it doesn't + + if( flush = amt > size - bt->mgr->redoend ) { + bt_mutexlock (bt->mgr->dump); + + if( bt_dumpredo (bt) ) + return 0; + } + + // assign new lsn to transaction + + lsn = ++bt->mgr->lsn; + + // fill in new entries + + for( src = 0; src++ < source->cnt; ) { + key = keyptr(source, src); + val = valptr(source, src); + + switch( slotptr(source, src)->type ) { + case Unique: + type = BTRM_add; + break; + case Duplicate: + type = BTRM_dup; + break; + case Delete: + type = BTRM_del; + break; + case Update: + type = BTRM_upd; + break; + } + + amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal); + amt += sizeof(BtLogHdr); + + hdr = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend); + hdr->len = amt; + hdr->type = type; + hdr->lsn = lsn; + hdr->lvl = 0; + + // fill in key and value + + memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey)); + memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal)); + + bt->mgr->redoend += amt; + } + + eof = (BtLogHdr *)(bt->mgr->redobuff + bt->mgr->redoend); + memset (eof, 0, sizeof(BtLogHdr)); + + bt_releasemutex(bt->mgr->redo); + + if( flush ) { + bt_flushlsn (bt); + bt_releasemutex(bt->mgr->dump); + } + + return lsn; +} + // read page into buffer pool from permanent location in Btree file BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no) @@ -769,7 +932,7 @@ off64_t off = page_no << mgr->page_bits; #ifdef unix if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) { - fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno); + fprintf (stderr, "Unable to read page %d errno = %d\n", page_no, errno); return BTERR_read; } #else @@ -781,7 +944,7 @@ uint amt[1]; ovl->OffsetHigh = off >> 32; if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) { - fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError()); + fprintf (stderr, "Unable to read page %d GetLastError = %d\n", page_no, GetLastError()); return BTERR_read; } if( *amt < mgr->page_size ) { @@ -819,46 +982,15 @@ uint amt[1]; return 0; } -// link latch table entry into head of latch hash table - -BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit) -{ -BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool); -BtLatchSet *latch = bt->mgr->latchsets + slot; - - if( latch->next = bt->mgr->hashtable[hashidx].slot ) - bt->mgr->latchsets[latch->next].prev = slot; - - bt->mgr->hashtable[hashidx].slot = slot; - latch->page_no = page_no; - latch->entry = slot; - latch->split = 0; - latch->prev = 0; - latch->pin = 1; - - if( loadit ) - if( bt->err = bt_readpage (bt->mgr, page, page_no) ) - return bt->err; - else - bt->reads++; - - return bt->err = 0; -} - // set CLOCK bit in latch // decrement pin count void bt_unpinlatch (BtLatchSet *latch) { -#ifdef unix - if( ~latch->pin & CLOCK_bit ) - __sync_fetch_and_or(&latch->pin, CLOCK_bit); - __sync_fetch_and_add(&latch->pin, -1); -#else - if( ~latch->pin & CLOCK_bit ) - _InterlockedOr16 (&latch->pin, CLOCK_bit); - _InterlockedDecrement16 (&latch->pin); -#endif + bt_mutexlock(latch->modify); + latch->pin |= CLOCK_bit; + latch->pin--; + bt_releasemutex(latch->modify); } // return the btree cached page address @@ -870,203 +1002,294 @@ BtPage bt_mappage (BtDb *bt, BtLatchSet *latch) { BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool); - if( latch->dirty ) - if( page->lsn < bt->mgr->flushlsn ) - if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) ) - return NULL; - else - latch->dirty = 0, bt->writes++; - return page; } -// find existing latchset or inspire new one -// return with latchset pinned +// return next available latch entry +// and with latch entry locked -BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit) +uint bt_availnext (BtDb *bt) { -uint hashidx = page_no % bt->mgr->latchhash; BtLatchSet *latch; -uint attempts = 0; -uint slot, idx; -uint lvl, cnt; -off64_t off; -uint amt[1]; -BtPage page; +uint entry; - // try to find our entry + while( 1 ) { +#ifdef unix + entry = __sync_fetch_and_add (&bt->mgr->latchavail, 1) + 1; +#else + entry = _InterlockedIncrement (&bt->mgr->latchavail); +#endif + entry %= bt->mgr->latchtotal; - bt_spinwritelock(bt->mgr->hashtable[hashidx].latch); + if( !entry ) + continue; - if( slot = bt->mgr->hashtable[hashidx].slot ) do - { - latch = bt->mgr->latchsets + slot; - if( page_no == latch->page_no ) - break; - } while( slot = latch->next ); + latch = bt->mgr->latchsets + entry; - // found our entry - // increment clock + if( !latch->avail ) + continue; - if( slot ) { - latch = bt->mgr->latchsets + slot; -#ifdef unix - __sync_fetch_and_add(&latch->pin, 1); -#else - _InterlockedIncrement16 (&latch->pin); -#endif - bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch); - return latch; + bt_mutexlock(latch->modify); + + if( !latch->avail ) { + bt_releasemutex(latch->modify); + continue; + } + + return entry; } +} - // see if there are any unused pool entries -#ifdef unix - slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1; -#else - slot = _InterlockedIncrement (&bt->mgr->latchdeployed); -#endif +// find and add the next available latch entry +// to the queue - if( slot < bt->mgr->latchtotal ) { - latch = bt->mgr->latchsets + slot; - if( bt_latchlink (bt, hashidx, slot, page_no, loadit) ) - return NULL; - bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch); - return latch; - } +BTERR bt_availlatch (BtDb *bt) +{ +BtLatchSet *latch; +uint startattempt; +uint cnt, entry; +uint hashidx; +BtPage page; -#ifdef unix - __sync_fetch_and_add (&bt->mgr->latchdeployed, -1); -#else - _InterlockedDecrement (&bt->mgr->latchdeployed); -#endif // find and reuse previous entry on victim + startattempt = bt->mgr->latchvictim; + while( 1 ) { #ifdef unix - slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1); + entry = __sync_fetch_and_add(&bt->mgr->latchvictim, 1); #else - slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1; + entry = _InterlockedIncrement (&bt->mgr->latchvictim) - 1; #endif - // try to get write lock on hash chain - // skip entry if not obtained - // or has outstanding pins + // skip entry if it has outstanding pins - slot %= bt->mgr->latchtotal; + entry %= bt->mgr->latchtotal; - if( !slot ) + if( !entry ) continue; - // only go around two times before + // only go around one time before // flushing redo recovery buffer, - // and the buffer pool. + // and the buffer pool to free up entries. if( bt->mgr->redopages ) - if( attempts++ > 2 * bt->mgr->latchtotal ) { - if( bt_dumpredo (bt) ) - return NULL; - bt_flushlsn (bt); - attempts = 0; + if( bt->mgr->latchvictim - startattempt > bt->mgr->latchtotal ) { + if( bt_mutextry (bt->mgr->dump) ) { + if( bt_dumpredo (bt) ) + return bt->err; + bt_flushlsn (bt); + // synchronize the various threads running into this condition + // so that only one thread does the dump and flush + } else + bt_mutexlock(bt->mgr->dump); + + startattempt = bt->mgr->latchvictim; + bt_releasemutex(bt->mgr->dump); } - latch = bt->mgr->latchsets + slot; - idx = latch->page_no % bt->mgr->latchhash; + latch = bt->mgr->latchsets + entry; - // see we are on same chain as hashidx + if( latch->avail ) + continue; - if( idx == hashidx ) - continue; + bt_mutexlock(latch->modify); - if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) ) - continue; + // skip if already an available entry + + if( latch->avail ) { + bt_releasemutex(latch->modify); + continue; + } - // skip this slot if it is pinned - // or the CLOCK bit is set + // skip this entry if it is pinned + // if the CLOCK bit is set + // reset it to zero. if( latch->pin ) { - if( latch->pin & CLOCK_bit ) { -#ifdef unix - __sync_fetch_and_and(&latch->pin, ~CLOCK_bit); -#else - _InterlockedAnd16 (&latch->pin, ~CLOCK_bit); -#endif - } - bt_spinreleasewrite (bt->mgr->hashtable[idx].latch); + latch->pin &= ~CLOCK_bit; + bt_releasemutex(latch->modify); continue; } - page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool); + page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool); // if dirty page has lsn >= last redo recovery buffer - // then hold page in the buffer pool until redo - // recovery buffer is written to disk. + // then hold page in the buffer pool until next redo + // recovery buffer is being written to disk. if( latch->dirty ) if( page->lsn >= bt->mgr->flushlsn ) { - bt_spinreleasewrite (bt->mgr->hashtable[idx].latch); + bt_releasemutex(latch->modify); continue; } - - // update permanent page area in btree from buffer pool - // no read-lock is required since page is not pinned. - if( latch->dirty ) - if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) ) - return NULL; - else - latch->dirty = 0, bt->writes++; - - // unlink our available slot from its hash chain + // entry is available +#ifdef unix + __sync_fetch_and_add (&bt->mgr->available, 1); +#else + _InterlockedIncrement(&bt->mgr->available); +#endif + latch->avail = 1; + bt_releasemutex(latch->modify); + return 0; + } +} - if( latch->prev ) - bt->mgr->latchsets[latch->prev].next = latch->next; - else - bt->mgr->hashtable[idx].slot = latch->next; +// release available latch requests - if( latch->next ) - bt->mgr->latchsets[latch->next].prev = latch->prev; +void bt_availrelease (BtDb *bt, uint count) +{ +#ifdef unix + __sync_fetch_and_add(bt->mgr->availlock, -count); +#else + _InterlockedAdd(bt->mgr->availlock, -count); +#endif +} - bt_spinreleasewrite (bt->mgr->hashtable[idx].latch); +// commit available chain entries +// find available entries as required - if( bt_latchlink (bt, hashidx, slot, page_no, loadit) ) - return NULL; +void bt_availrequest (BtDb *bt, uint count) +{ +#ifdef unix + __sync_fetch_and_add(bt->mgr->availlock, count); +#else + _InterlockedAdd(bt->mgr->availlock, count); +#endif - bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch); - return latch; - } + while( *bt->mgr->availlock > bt->mgr->available ) + bt_availlatch (bt); } -// flush pages +// find available latchset +// return with latchset pinned -void bt_flushlsn (BtDb *bt) +BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, BtPage loadit) { +uint hashidx = page_no % bt->mgr->latchhash; BtLatchSet *latch; -uint hashidx; -uint num = 0; +uint entry, idx; BtPage page; -uint slot; - // flush dirty pool pages to the btree that were - // dirty before the start of this redo recovery buffer + // try to find our entry - for( slot = 1; slot <= bt->mgr->latchdeployed; slot++ ) { - page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool); - latch = bt->mgr->latchsets + slot; -// hashidx = latch->page_no % bt->mgr->latchhash; + bt_mutexlock(bt->mgr->hashtable[hashidx].latch); -// if( !bt_spinwritetry (bt->mgr->hashtable[hashidx].latch) ) -// continue; + if( entry = bt->mgr->hashtable[hashidx].entry ) do + { + latch = bt->mgr->latchsets + entry; + if( page_no == latch->page_no ) + break; + } while( entry = latch->next ); - if( latch->dirty ) { - bt_lockpage(bt, BtLockRead, latch); - bt_writepage(bt->mgr, page, latch->page_no); - latch->dirty = 0, bt->writes++; - bt_unlockpage(bt, BtLockRead, latch); - } + // found our entry: increment pin + // remove from available status -// bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch); + if( entry ) { + latch = bt->mgr->latchsets + entry; + bt_mutexlock(latch->modify); + if( latch->avail ) +#ifdef unix + __sync_fetch_and_add (&bt->mgr->available, -1); +#else + _InterlockedDecrement(&bt->mgr->available); +#endif + latch->avail = 0; + latch->pin |= CLOCK_bit; + latch->pin++; + + bt_releasemutex(latch->modify); + bt_releasemutex(bt->mgr->hashtable[hashidx].latch); + return latch; + } + + // find and reuse entry from available set + +trynext: + + if( entry = bt_availnext (bt) ) + latch = bt->mgr->latchsets + entry; + else + return bt->line = __LINE__, bt->err = BTERR_avail, NULL; + + idx = latch->page_no % bt->mgr->latchhash; + + // if latch is on a different hash chain + // unlink from the old page_no chain + + if( latch->page_no ) + if( idx != hashidx ) { + + // skip over this entry if latch not available + + if( !bt_mutextry (bt->mgr->hashtable[idx].latch) ) { + bt_releasemutex(latch->modify); + goto trynext; } -} + if( latch->prev ) + bt->mgr->latchsets[latch->prev].next = latch->next; + else + bt->mgr->hashtable[idx].entry = latch->next; + + if( latch->next ) + bt->mgr->latchsets[latch->next].prev = latch->prev; + + bt_releasemutex (bt->mgr->hashtable[idx].latch); + } + + // remove available status + + latch->avail = 0; +#ifdef unix + __sync_fetch_and_add (&bt->mgr->available, -1); +#else + _InterlockedDecrement(&bt->mgr->available); +#endif + page = (BtPage)(((uid)entry << bt->mgr->page_bits) + bt->mgr->pagepool); + + if( latch->dirty ) + if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) ) + return bt->line = __LINE__, NULL; + else + latch->dirty = 0, bt->writes++; + + if( loadit ) { + memcpy (page, loadit, bt->mgr->page_size); + latch->dirty = 1; + } else + if( bt->err = bt_readpage (bt->mgr, page, page_no) ) + return bt->line = __LINE__, NULL; + else + bt->reads++; + + // link page as head of hash table chain + // if this is a never before used entry, + // or it was previously on a different + // hash table chain. Otherwise, just + // leave it in its current hash table + // chain position. + + if( !latch->page_no || hashidx != idx ) { + if( latch->next = bt->mgr->hashtable[hashidx].entry ) + bt->mgr->latchsets[latch->next].prev = entry; + + bt->mgr->hashtable[hashidx].entry = entry; + latch->prev = 0; + } + + // fill in latch structure + + latch->pin = CLOCK_bit | 1; + latch->page_no = page_no; + latch->entry = entry; + latch->split = 0; + + bt_releasemutex (latch->modify); + bt_releasemutex (bt->mgr->hashtable[hashidx].latch); + return latch; +} + void bt_mgrclose (BtMgr *mgr) { BtLatchSet *latch; @@ -1075,7 +1298,10 @@ uint num = 0; BtPage page; uint slot; - // flush recovery buffer to disk + // flush previously written dirty pages + // and write recovery buffer to disk + + fdatasync (mgr->idx); if( mgr->redoend ) { eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend); @@ -1084,9 +1310,9 @@ uint slot; pwrite (mgr->idx, mgr->redobuff, mgr->redoend + sizeof(BtLogHdr), REDO_page << mgr->page_bits); } - // flush dirty pool pages to the btree + // write remaining dirty pool pages to the btree - for( slot = 1; slot <= mgr->latchdeployed; slot++ ) { + for( slot = 1; slot < mgr->latchtotal; slot++ ) { page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool); latch = mgr->latchsets + slot; @@ -1096,15 +1322,28 @@ uint slot; } } + // flush last batch to disk and clear + // redo recovery buffer on disk. + + fdatasync (mgr->idx); + + eof = (BtLogHdr *)mgr->redobuff; + memset (eof, 0, sizeof(BtLogHdr)); + eof->lsn = mgr->lsn; + + pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits); + + sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); + fprintf(stderr, "%d buffer pool pages flushed\n", num); #ifdef unix - munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits); + munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits); munmap (mgr->pagezero, mgr->page_size); #else FlushViewOfFile(mgr->pagezero, 0); UnmapViewOfFile(mgr->pagezero); - UnmapViewOfFile(mgr->hashtable); + UnmapViewOfFile(mgr->pagepool); CloseHandle(mgr->halloc); CloseHandle(mgr->hpool); #endif @@ -1146,6 +1385,7 @@ uint nlatchpage, latchhash; unsigned char value[BtId]; int flag, initit = 0; BtPageZero *pagezero; +BtLatchSet *latch; off64_t size; uint amt[1]; BtMgr* mgr; @@ -1217,11 +1457,10 @@ BtVal *val; // calculate number of latch hash table entries - mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size; - mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry); + mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size; mgr->nlatchpage += nodemax; // size of the buffer pool in pages - mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size; + mgr->nlatchpage += (sizeof(BtLatchSet) * (uid)nodemax + mgr->page_size - 1)/mgr->page_size; mgr->latchtotal = nodemax; if( !initit ) @@ -1287,8 +1526,8 @@ mgrlatch: } mlock (mgr->pagezero, mgr->page_size); - mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0); - if( mgr->hashtable == MAP_FAILED ) { + mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0); + if( mgr->pagepool == MAP_FAILED ) { fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } @@ -1318,8 +1557,8 @@ mgrlatch: } flag = FILE_MAP_WRITE; - mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size); - if( !mgr->hashtable ) { + mgr->pagepool = MapViewOfFile(mgr->pool, flag, 0, 0, size); + if( !mgr->pagepool ) { fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError()); return bt_mgrclose (mgr), NULL; } @@ -1327,8 +1566,17 @@ mgrlatch: mgr->redobuff = VirtualAlloc (NULL, redopages * mgr->page_size | MEM_COMMIT, PAGE_READWRITE); #endif - mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits); - mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet)); + mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits)); + mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal); + mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry); + + // mark all pool entries as available + + for( idx = 1; idx < mgr->latchtotal; idx++ ) { + latch = mgr->latchsets + idx; + latch->avail = 1; + mgr->available++; + } return mgr; } @@ -1449,19 +1697,19 @@ int blk; // lock allocation page - bt_spinwritelock(bt->mgr->lock); + bt_mutexlock(bt->mgr->lock); // use empty chain first // else allocate empty page if( page_no = bt_getid(bt->mgr->pagezero->chain) ) { - if( set->latch = bt_pinlatch (bt, page_no, 1) ) + if( set->latch = bt_pinlatch (bt, page_no, NULL) ) set->page = bt_mappage (bt, set->latch); else - return bt->err = BTERR_struct, -1; + return bt->err = BTERR_struct, bt->line = __LINE__, -1; bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right)); - bt_spinreleasewrite(bt->mgr->lock); + bt_releasemutex(bt->mgr->lock); memcpy (set->page, contents, bt->mgr->page_size); set->latch->dirty = 1; @@ -1471,18 +1719,18 @@ int blk; page_no = bt_getid(bt->mgr->pagezero->alloc->right); bt_putid(bt->mgr->pagezero->alloc->right, page_no+1); - // unlock allocation latch + // unlock allocation latch and + // extend file into new page. - bt_spinreleasewrite(bt->mgr->lock); + bt_releasemutex(bt->mgr->lock); // don't load cache from btree page - if( set->latch = bt_pinlatch (bt, page_no, 0) ) + if( set->latch = bt_pinlatch (bt, page_no, contents) ) set->page = bt_mappage (bt, set->latch); else - return bt->err = BTERR_struct; + return bt->err; - memcpy (set->page, contents, bt->mgr->page_size); set->latch->dirty = 1; return 0; } @@ -1529,6 +1777,7 @@ uid page_no = ROOT_page, prevpage = 0; uint drill = 0xff, slot; BtLatchSet *prevlatch; uint mode, prevmode; +BtVal *val; // start at root of btree and drill down @@ -1536,7 +1785,7 @@ uint mode, prevmode; // determine lock mode of drill level mode = (drill == lvl) ? lock : BtLockRead; - if( !(set->latch = bt_pinlatch (bt, page_no, 1)) ) + if( !(set->latch = bt_pinlatch (bt, page_no, NULL)) ) return 0; // obtain access lock using lock chaining with Access mode @@ -1559,7 +1808,7 @@ uint mode, prevmode; bt_lockpage(bt, mode, set->latch); if( set->page->free ) - return bt->err = BTERR_struct, 0; + return bt->err = BTERR_struct, bt->line = __LINE__, 0; if( page_no > ROOT_page ) bt_unlockpage(bt, BtLockAccess, set->latch); @@ -1568,7 +1817,7 @@ uint mode, prevmode; if( set->page->lvl != drill) { if( set->latch->page_no != ROOT_page ) - return bt->err = BTERR_struct, 0; + return bt->err = BTERR_struct, bt->line = __LINE__, 0; drill = set->page->lvl; @@ -1597,21 +1846,27 @@ uint mode, prevmode; if( slot++ < set->page->cnt ) continue; else - return bt->err = BTERR_struct, 0; + return bt->err = BTERR_struct, bt->line = __LINE__, 0; + + val = valptr(set->page, slot); + + if( val->len == BtId ) + page_no = bt_getid(valptr(set->page, slot)->value); + else + return bt->line = __LINE__, bt->err = BTERR_struct, 0; - page_no = bt_getid(valptr(set->page, slot)->value); drill--; continue; } - // or slide right into next page + // slide right into next page page_no = bt_getid(set->page->right); } while( page_no ); // return error on end of right chain - bt->err = BTERR_struct; + bt->line = __LINE__, bt->err = BTERR_struct; return 0; // return error } @@ -1622,7 +1877,7 @@ void bt_freepage (BtDb *bt, BtPageSet *set) { // lock allocation page - bt_spinwritelock (bt->mgr->lock); + bt_mutexlock (bt->mgr->lock); // store chain @@ -1639,7 +1894,7 @@ void bt_freepage (BtDb *bt, BtPageSet *set) // unlock allocation page - bt_spinreleasewrite (bt->mgr->lock); + bt_releasemutex (bt->mgr->lock); } // a fence key was deleted from a page @@ -1694,6 +1949,7 @@ BTERR bt_collapseroot (BtDb *bt, BtPageSet *root) { BtPageSet child[1]; uid page_no; +BtVal *val; uint idx; // find the child entry and promote as new root contents @@ -1703,9 +1959,14 @@ uint idx; if( !slotptr(root->page, idx)->dead ) break; - page_no = bt_getid (valptr(root->page, idx)->value); + val = valptr(root->page, idx); - if( child->latch = bt_pinlatch (bt, page_no, 1) ) + if( val->len == BtId ) + page_no = bt_getid (valptr(root->page, idx)->value); + else + return bt->line = __LINE__, bt->err = BTERR_struct; + + if( child->latch = bt_pinlatch (bt, page_no, NULL) ) child->page = bt_mappage (bt, child->latch); else return bt->err; @@ -1748,7 +2009,7 @@ BtKey *ptr; page_no = bt_getid(set->page->right); - if( right->latch = bt_pinlatch (bt, page_no, 1) ) + if( right->latch = bt_pinlatch (bt, page_no, NULL) ) right->page = bt_mappage (bt, right->latch); else return 0; @@ -1761,7 +2022,7 @@ BtKey *ptr; memcpy (higherfence, ptr, ptr->len + sizeof(BtKey)); if( right->page->kill ) - return bt->err = BTERR_struct; + return bt->line = __LINE__, bt->err = BTERR_struct; // pull contents of right peer into our empty page @@ -1895,12 +2156,12 @@ uid page_no; prevlatch = set->latch; if( page_no = bt_getid(set->page->right) ) - if( set->latch = bt_pinlatch (bt, page_no, 1) ) + if( set->latch = bt_pinlatch (bt, page_no, NULL) ) set->page = bt_mappage (bt, set->latch); else return 0; else - return bt->err = BTERR_struct, 0; + return bt->err = BTERR_struct, bt->line = __LINE__, 0; // obtain access lock using lock chaining with Access mode @@ -2004,6 +2265,7 @@ BtVal *val; memset (page+1, 0, bt->mgr->page_size - sizeof(*page)); set->latch->dirty = 1; + page->garbage = 0; page->act = 0; @@ -2197,6 +2459,8 @@ uint prev; if( bt_newpage(bt, right, bt->frame) ) return 0; + // process lower keys + memcpy (bt->frame, set->page, bt->mgr->page_size); memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page)); set->latch->dirty = 1; @@ -2410,7 +2674,7 @@ uint type; ptr = keyptr(set->page, slot); else { if( !bt->err ) - bt->err = BTERR_ovflw; + bt->line = __LINE__, bt->err = BTERR_ovflw; return bt->err; } @@ -2495,6 +2759,7 @@ uint type; } typedef struct { + logseqno reqlsn; // redo log seq no required logseqno lsn; // redo log sequence number uint entry; // latch table entry number uint slot:31; // page slot number @@ -2548,7 +2813,7 @@ uint entry; } } while( entry = set->latch->split ); - bt->err = BTERR_atomic; + bt->line = __LINE__, bt->err = BTERR_atomic; return 0; } @@ -2582,21 +2847,21 @@ uint entry, slot; locks[src].slot = 0; } - return bt->err = BTERR_atomic; + return bt->line = __LINE__, bt->err = BTERR_atomic; } BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src) { BtKey *key = keyptr(source, src); -uint idx, entry, slot; BtPageSet set[1]; +uint idx, slot; BtKey *ptr; BtVal *val; if( slot = bt_atomicpage (bt, source, locks, src, set) ) ptr = keyptr(set->page, slot); else - return bt->err = BTERR_struct; + return bt->line = __LINE__, bt->err = BTERR_struct; if( !keycmp (ptr, key->key, key->len) ) if( !slotptr(set->page, slot)->dead ) @@ -2608,8 +2873,8 @@ BtVal *val; val = valptr(set->page, slot); set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); - set->page->lsn = locks[src].lsn; set->latch->dirty = 1; + set->page->lsn = locks[src].lsn; set->page->act--; bt->found++; return 0; @@ -2633,7 +2898,7 @@ BtKey *ptr; // grab the right sibling - if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) + if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), NULL) ) right->page = bt_mappage (bt, right->latch); else return bt->err; @@ -2673,7 +2938,7 @@ BtKey *ptr; // to remove scanner's poiner to the right page if( right_page_no = bt_getid (prev->page->right) ) { - if( temp->latch = bt_pinlatch (bt, right_page_no, 1) ) + if( temp->latch = bt_pinlatch (bt, right_page_no, NULL) ) temp->page = bt_mappage (bt, temp->latch); bt_lockpage (bt, BtLockWrite, temp->latch); @@ -2683,9 +2948,9 @@ BtKey *ptr; bt_unlockpage (bt, BtLockWrite, temp->latch); bt_unpinlatch (temp->latch); } else { // master is now the far right page - bt_spinwritelock (bt->mgr->lock); + bt_mutexlock (bt->mgr->lock); bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); - bt_spinreleasewrite(bt->mgr->lock); + bt_releasemutex(bt->mgr->lock); } // now that there are no pointers to the right page @@ -2708,7 +2973,7 @@ BtKey *ptr; int bt_atomictxn (BtDb *bt, BtPage source) { -uint src, idx, slot, samepage, entry; +uint src, idx, slot, samepage, entry, avail, que = 0; AtomicKey *head, *tail, *leaf; BtPageSet set[1], prev[1]; unsigned char value[BtId]; @@ -2717,6 +2982,7 @@ BtLatchSet *latch; AtomicTxn *locks; int result = 0; BtSlot temp[1]; +logseqno lsn; BtPage page; BtVal *val; uid right; @@ -2743,6 +3009,11 @@ int type; } } + // reserve enough buffer pool entries + + avail = source->cnt * 3 + bt->mgr->pagezero->alloc->lvl + 1; + bt_availrequest (bt, avail); + // Load the leaf page for each key // group same page references with reuse bit // and determine any constraint violations @@ -2782,6 +3053,12 @@ int type; locks[src].reuse = 1; } + // capture current lsn for master page + + locks[src].reqlsn = set->page->lsn; + + // perform constraint checks + switch( slotptr(source, src)->type ) { case Duplicate: case Unique: @@ -2816,29 +3093,12 @@ int type; // and add entries to redo log - for( src = 0; src++ < source->cnt; ) { - key = keyptr(source, src); - val = valptr(source, src); - switch( slotptr(source, src)->type ) { - case Unique: - type = BTRM_add; - break; - case Duplicate: - type = BTRM_dup; - break; - case Delete: - type = BTRM_del; - break; - case Update: - type = BTRM_upd; - break; - } - - if( locks[src].lsn = bt_newredo (bt, type, 0, key, val) ) - continue; - - goto atomicerr; - } + if( bt->mgr->redopages ) + if( lsn = bt_txnredo (bt, source) ) + for( src = 0; src++ < source->cnt; ) + locks[src].lsn = lsn; + else + goto atomicerr; // obtain write lock for each master page @@ -2928,7 +3188,7 @@ int type; // schedule prev fence key update ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1); + leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; @@ -2958,7 +3218,7 @@ int type; // far right sibling or set rightmost page in page zero if( right = bt_getid (prev->page->right) ) { - if( set->latch = bt_pinlatch (bt, right, 1) ) + if( set->latch = bt_pinlatch (bt, right, NULL) ) set->page = bt_mappage (bt, set->latch); else goto atomicerr; @@ -2969,15 +3229,15 @@ int type; bt_unlockpage (bt, BtLockWrite, set->latch); bt_unpinlatch (set->latch); } else { // prev is rightmost page - bt_spinwritelock (bt->mgr->lock); + bt_mutexlock (bt->mgr->lock); bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); - bt_spinreleasewrite(bt->mgr->lock); + bt_releasemutex(bt->mgr->lock); } // Process last page split in chain ptr = keyptr(prev->page,prev->page->cnt); - leaf = calloc (sizeof(AtomicKey), 1); + leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; @@ -3023,7 +3283,7 @@ int type; // perform the remainder of the delete // from the FIFO queue - leaf = calloc (sizeof(AtomicKey), 1); + leaf = calloc (sizeof(AtomicKey), 1), que++; memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; @@ -3044,6 +3304,11 @@ int type; bt_unlockpage(bt, BtLockWrite, prev->latch); } + bt_availrelease (bt, avail); + + que *= bt->mgr->pagezero->alloc->lvl; + bt_availrequest (bt, que); + // add & delete keys for any pages split or merged during transaction if( leaf = head ) @@ -3082,6 +3347,8 @@ int type; free (leaf); } while( leaf = tail ); + bt_availrelease (bt, que); + // return success free (locks); @@ -3097,7 +3364,7 @@ uint bt_lastkey (BtDb *bt) uid page_no = bt_getid (bt->mgr->pagezero->alloc->left); BtPageSet set[1]; - if( set->latch = bt_pinlatch (bt, page_no, 1) ) + if( set->latch = bt_pinlatch (bt, page_no, NULL) ) set->page = bt_mappage (bt, set->latch); else return 0; @@ -3130,7 +3397,7 @@ goleft: findourself: bt->cursor_page = next; - if( set->latch = bt_pinlatch (bt, next, 1) ) + if( set->latch = bt_pinlatch (bt, next, NULL) ) set->page = bt_mappage (bt, set->latch); else return 0; @@ -3178,7 +3445,7 @@ uid right; bt->cursor_page = right; - if( set->latch = bt_pinlatch (bt, right, 1) ) + if( set->latch = bt_pinlatch (bt, right, NULL) ) set->page = bt_mappage (bt, set->latch); else return 0; @@ -3293,102 +3560,29 @@ struct timeval tv[1]; void bt_poolaudit (BtMgr *mgr) { BtLatchSet *latch; -uint slot = 0; - - while( slot++ < mgr->latchdeployed ) { - latch = mgr->latchsets + slot; - - if( *latch->readwr->rin & MASK ) - fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no); - memset ((ushort *)latch->readwr, 0, sizeof(RWLock)); - - if( *latch->access->rin & MASK ) - fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no); - memset ((ushort *)latch->access, 0, sizeof(RWLock)); - - if( *latch->parent->ticket != *latch->parent->serving ) - fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no); - memset ((ushort *)latch->parent, 0, sizeof(RWLock)); - - if( latch->pin & ~CLOCK_bit ) { - fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no); - latch->pin = 0; - } - } -} - -uint bt_latchaudit (BtDb *bt) -{ -ushort idx, hashidx; -uid next, page_no; -BtLatchSet *latch; -uint cnt = 0; -BtKey *ptr; +uint entry = 0; - if( *(ushort *)(bt->mgr->lock) ) - fprintf(stderr, "Alloc page locked\n"); - *(ushort *)(bt->mgr->lock) = 0; + while( ++entry < mgr->latchtotal ) { + latch = mgr->latchsets + entry; - for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) { - latch = bt->mgr->latchsets + idx; if( *latch->readwr->rin & MASK ) - fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->readwr, 0, sizeof(RWLock)); + fprintf(stderr, "latchset %d rwlocked for page %d\n", entry, latch->page_no); if( *latch->access->rin & MASK ) - fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->access, 0, sizeof(RWLock)); - - if( *latch->parent->ticket != *latch->parent->serving ) - fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->parent, 0, sizeof(RWLock)); - - if( latch->pin ) { - fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); - latch->pin = 0; - } - } - - for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) { - if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) ) - fprintf(stderr, "hash entry %d locked\n", hashidx); - - *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0; - - if( idx = bt->mgr->hashtable[hashidx].slot ) do { - latch = bt->mgr->latchsets + idx; - if( latch->pin ) - fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); - } while( idx = latch->next ); - } - - page_no = LEAF_page; + fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no); - while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { - uid off = page_no << bt->mgr->page_bits; -#ifdef unix - pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off); -#else - DWORD amt[1]; + if( *latch->parent->exclusive ) + fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no); - SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN); + if( *latch->atomic->exclusive ) + fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no); - if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL)) - return bt->err = BTERR_map; + if( *latch->modify->exclusive ) + fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no); - if( *amt < bt->mgr->page_size ) - return bt->err = BTERR_map; -#endif - if( !bt->frame->free && !bt->frame->lvl ) - cnt += bt->frame->act; - page_no++; + if( latch->pin & ~CLOCK_bit ) + fprintf(stderr, "latchset %d pinned %d times for page %d\n", entry, latch->pin & ~CLOCK_bit, latch->page_no); } - - cnt--; // remove stopper key - fprintf(stderr, " Total keys read %d\n", cnt); - - bt_close (bt); - return 0; } typedef struct { @@ -3432,12 +3626,6 @@ FILE *in; switch(ch | 0x20) { - case 'a': - fprintf(stderr, "started latch mgr audit\n"); - cnt = bt_latchaudit (bt); - fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt); - break; - case 'd': type = Delete; @@ -3464,7 +3652,7 @@ FILE *in; if( !args->num ) { if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); + fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0); len = 0; continue; } @@ -3489,7 +3677,7 @@ FILE *in; page->min = nxt; if( bt_atomictxn (bt, page) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); + fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0); nxt = sizeof(txn); cnt = 0; @@ -3508,7 +3696,7 @@ FILE *in; line++; if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) ) - fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); + fprintf(stderr, "Error %d Line: %d source: %d\n", bt->err, bt->line, line), exit(0); len = 0; } else if( len < BT_maxkey ) @@ -3526,7 +3714,7 @@ FILE *in; if( bt_findkey (bt, key, len, NULL, 0) == 0 ) found++; else if( bt->err ) - fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0); + fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->err, errno, bt->line, line), exit(0); len = 0; } else if( len < BT_maxkey ) @@ -3536,8 +3724,9 @@ FILE *in; case 's': fprintf(stderr, "started scanning\n"); + do { - if( set->latch = bt_pinlatch (bt, page_no, 1) ) + if( set->latch = bt_pinlatch (bt, page_no, NULL) ) set->page = bt_mappage (bt, set->latch); else fprintf(stderr, "unable to obtain latch"), exit(1); @@ -3560,6 +3749,7 @@ FILE *in; cnt++; } + set->latch->avail = 1; bt_unlockpage (bt, BtLockRead, set->latch); bt_unpinlatch (set->latch); } while( page_no = next ); -- 2.40.0