-// btree version threadskv9c FUTEX version
+// btree version threadskv9c sched_yield version
// with reworked bt_deletekey code,
// phase-fair reader writer lock,
// librarian page split code,
// write only lock
typedef struct {
- volatile uint exclusive[1];
+ volatile ushort ticket[1];
+ volatile ushort serving[1];
ushort tid;
ushort dup;
} WOLock;
// exclusive is set for write access
typedef struct {
- volatile uint exclusive[1];
+ volatile unsigned char exclusive[1];
+ unsigned char filler;
} BtMutexLatch;
#define XCL 1
// hash table entries
typedef struct {
- volatile uint entry; // Latch table entry at head of chain
+ uint entry; // Latch table entry at head of chain
BtMutexLatch latch[1];
} BtHashEntry;
void WriteOLock (WOLock *lock, ushort tid)
{
-uint prev;
+ushort tix;
if( lock->tid == tid ) {
lock->dup++;
return;
}
-
- while( 1 ) {
#ifdef unix
- prev = __sync_fetch_and_or (lock->exclusive, 1);
+ tix = __sync_fetch_and_add (lock->ticket, 1);
#else
- prev = _InterlockedExchangeOr (lock->exclusive, 1);
+ tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
#endif
- if( !(prev & XCL) ) {
- lock->tid = tid;
- return;
- }
+ // wait for our ticket to come up
+
+ while( tix != lock->serving[0] )
#ifdef unix
- sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
+ sched_yield();
#else
- SwitchToThread ();
+ SwitchToThread ();
#endif
- }
+ lock->tid = tid;
}
void WriteORelease (WOLock *lock)
return;
}
- *lock->exclusive = 0;
lock->tid = 0;
-#ifdef linux
- sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
+ lock->serving[0]++;
}
// Phase-Fair reader/writer lock implementation
void bt_mutexlock(BtMutexLatch *latch)
{
-uint prev;
+unsigned char prev;
- while( 1 ) {
+ do {
#ifdef unix
prev = __sync_fetch_and_or(latch->exclusive, XCL);
#else
- prev = _InterlockedOr(latch->exclusive, XCL);
+ prev = _InterlockedOr8(latch->exclusive, XCL);
#endif
if( !(prev & XCL) )
return;
#ifdef unix
- sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
+ } while( sched_yield(), 1 );
#else
- SwitchToThread();
+ } while( SwitchToThread(), 1 );
#endif
- }
}
// try to obtain write lock
#ifdef unix
prev = __sync_fetch_and_or(latch->exclusive, XCL);
#else
- prev = _InterlockedOr(latch->exclusive, XCL);
+ prev = _InterlockedOr8(latch->exclusive, XCL);
#endif
// take write access if exclusive bit is clear
void bt_releasemutex(BtMutexLatch *latch)
{
*latch->exclusive = 0;
-#ifdef unix
- sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
}
// recovery manager -- flush dirty pages
fdatasync (mgr->idx);
- eof = (BtLogHdr *)mgr->redobuff;
- memset (eof, 0, sizeof(BtLogHdr));
- eof->lsn = mgr->lsn;
+ if( mgr->redopages ) {
+ eof = (BtLogHdr *)mgr->redobuff;
+ memset (eof, 0, sizeof(BtLogHdr));
+ eof->lsn = mgr->lsn;
- pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
+ pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
- sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+ sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+ }
fprintf(stderr, "%d buffer pool pages flushed\n", num);
CloseHandle(mgr->hpool);
#endif
#ifdef unix
- free (mgr->redobuff);
+ if( mgr->redopages )
+ free (mgr->redobuff);
close (mgr->idx);
free (mgr);
#else
- VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
+ if( mgr->redopages )
+ VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
FlushFileBuffers(mgr->idx);
CloseHandle(mgr->idx);
GlobalFree (mgr);
if( *latch->access->rin & MASK )
fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
- if( *latch->parent->exclusive )
+ if( *latch->parent->ticket != *latch->parent->serving )
fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
- if( *latch->atomic->exclusive )
+ if( *latch->atomic->ticket != *latch->atomic->serving )
fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
if( *latch->modify->exclusive )
posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
#endif
fprintf(stderr, "started counting\n");
- page_no = LEAF_page;
+ next = LEAF_page + bt->mgr->redopages + 1;
while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
if( bt_readpage (bt->mgr, bt->frame, page_no) )
cnt += bt->frame->act;
bt->reads++;
- page_no++;
+ page_no = next++;
}
cnt--; // remove stopper key