-// btree version threadskv9c FUTEX version
+// btree version threadskv9c sched_yield version
// with reworked bt_deletekey code,
// phase-fair reader writer lock,
// librarian page split code,
BtLockAtomic = 32
} BtLock;
+// lite weight mutex
+
+typedef struct {
+ union {
+ struct {
+ uint xlock:1; // one writer has exclusive lock
+ uint wrt:31; // count of other writers waiting
+ } bits[1];
+ uint value[1];
+ };
+} BtMutexLatch;
+
+#define XCL 1
+#define WRT 2
+
+// mode & definition for lite latch implementation
+
+enum {
+ QueRd = 1, // reader queue
+ QueWr = 2 // writer queue
+} RWQueue;
+
// definition for phase-fair reader/writer lock implementation
typedef struct {
// write only lock
typedef struct {
- volatile uint exclusive[1];
+ BtMutexLatch xcl[1];
ushort tid;
ushort dup;
} WOLock;
#define MASK 0x3
#define RINC 0x4
-// lite weight mutex
-
-// exclusive is set for write access
-
-typedef struct {
- volatile uint exclusive[1];
-} BtMutexLatch;
-
-#define XCL 1
-
-// mode & definition for lite latch implementation
-
-enum {
- QueRd = 1, // reader queue
- QueWr = 2 // writer queue
-} RWQueue;
-
// hash table entries
typedef struct {
- volatile uint entry; // Latch table entry at head of chain
+ uint entry; // Latch table entry at head of chain
BtMutexLatch latch[1];
} BtHashEntry;
#endif
}
+// lite weight spin lock Latch Manager
+
+int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
+{
+ return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
+}
+
+void bt_mutexlock(BtMutexLatch *latch)
+{
+BtMutexLatch prev[1];
+uint slept = 0;
+
+ while( 1 ) {
+ *prev->value = __sync_fetch_and_or(latch->value, XCL);
+
+ if( !prev->bits->xlock ) { // did we set XCL bit?
+ if( slept )
+ __sync_fetch_and_sub(latch->value, WRT);
+ return;
+ }
+
+ if( !slept ) {
+ prev->bits->wrt++;
+ __sync_fetch_and_add(latch->value, WRT);
+ }
+
+ sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
+ slept = 1;
+ }
+}
+
+// try to obtain write lock
+
+// return 1 if obtained,
+// 0 otherwise
+
+int bt_mutextry(BtMutexLatch *latch)
+{
+BtMutexLatch prev[1];
+
+ *prev->value = __sync_fetch_and_or(latch->value, XCL);
+
+ // take write access if exclusive bit is clear
+
+ return !prev->bits->xlock;
+}
+
+// clear write mode
+
+void bt_releasemutex(BtMutexLatch *latch)
+{
+BtMutexLatch prev[1];
+
+ *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
+
+ if( prev->bits->wrt )
+ sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+}
+
// Write-Only Queue Lock
void WriteOLock (WOLock *lock, ushort tid)
{
-uint prev;
-
if( lock->tid == tid ) {
lock->dup++;
return;
}
- while( 1 ) {
-#ifdef unix
- prev = __sync_fetch_and_or (lock->exclusive, 1);
-#else
- prev = _InterlockedExchangeOr (lock->exclusive, 1);
-#endif
- if( !(prev & XCL) ) {
- lock->tid = tid;
- return;
- }
-#ifdef unix
- sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
-#else
- SwitchToThread ();
-#endif
- }
+ bt_mutexlock(lock->xcl);
+ lock->tid = tid;
}
void WriteORelease (WOLock *lock)
return;
}
- *lock->exclusive = 0;
lock->tid = 0;
-#ifdef linux
- sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
+ bt_releasemutex(lock->xcl);
}
// Phase-Fair reader/writer lock implementation
#endif
}
-// lite weight spin lock Latch Manager
-
-int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
-{
- return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
-}
-
-void bt_mutexlock(BtMutexLatch *latch)
-{
-uint prev;
-
- while( 1 ) {
-#ifdef unix
- prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
- prev = _InterlockedOr(latch->exclusive, XCL);
-#endif
- if( !(prev & XCL) )
- return;
-#ifdef unix
- sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
-#else
- SwitchToThread();
-#endif
- }
-}
-
-// try to obtain write lock
-
-// return 1 if obtained,
-// 0 otherwise
-
-int bt_mutextry(BtMutexLatch *latch)
-{
-uint prev;
-
-#ifdef unix
- prev = __sync_fetch_and_or(latch->exclusive, XCL);
-#else
- prev = _InterlockedOr(latch->exclusive, XCL);
-#endif
- // take write access if exclusive bit is clear
-
- return !(prev & XCL);
-}
-
-// clear write mode
-
-void bt_releasemutex(BtMutexLatch *latch)
-{
- *latch->exclusive = 0;
-#ifdef unix
- sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
-}
-
-// recovery manager -- flush dirty pages
-
void bt_flushlsn (BtDb *bt)
{
uint cnt3 = 0, cnt2 = 0, cnt = 0;
fdatasync (mgr->idx);
- eof = (BtLogHdr *)mgr->redobuff;
- memset (eof, 0, sizeof(BtLogHdr));
- eof->lsn = mgr->lsn;
+ if( mgr->redopages ) {
+ eof = (BtLogHdr *)mgr->redobuff;
+ memset (eof, 0, sizeof(BtLogHdr));
+ eof->lsn = mgr->lsn;
- pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
+ pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
- sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+ sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+ }
fprintf(stderr, "%d buffer pool pages flushed\n", num);
CloseHandle(mgr->hpool);
#endif
#ifdef unix
- free (mgr->redobuff);
+ if( mgr->redopages )
+ free (mgr->redobuff);
close (mgr->idx);
free (mgr);
#else
- VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
+ if( mgr->redopages )
+ VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
FlushFileBuffers(mgr->idx);
CloseHandle(mgr->idx);
GlobalFree (mgr);
if( *latch->access->rin & MASK )
fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
- if( *latch->parent->exclusive )
+ if( *latch->parent->xcl->value )
fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
- if( *latch->atomic->exclusive )
+ if( *latch->atomic->xcl->value )
fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
- if( *latch->modify->exclusive )
+ if( *latch->modify->value )
fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no);
if( latch->pin & ~CLOCK_bit )
posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
#endif
fprintf(stderr, "started counting\n");
- page_no = LEAF_page;
+ next = LEAF_page + bt->mgr->redopages + 1;
while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
if( bt_readpage (bt->mgr, bt->frame, page_no) )
cnt += bt->frame->act;
bt->reads++;
- page_no++;
+ page_no = next++;
}
cnt--; // remove stopper key