X-Git-Url: https://pd.if.org/git/?a=blobdiff_plain;f=threadskv9c.c;h=974beeb5c93ad244ae575cd3fb89886556ca9bb1;hb=172ab9f8496947eb879883777aeaf3f4b17f3522;hp=6029526f8108525eab3a969e9e0a85253822a323;hpb=4be215184b59e4f8dfebc0fa25a7a83dfcfed39c;p=btree diff --git a/threadskv9c.c b/threadskv9c.c index 6029526..974beeb 100644 --- a/threadskv9c.c +++ b/threadskv9c.c @@ -1,4 +1,4 @@ -// btree version threadskv9c FUTEX version +// btree version threadskv9c sched_yield version // with reworked bt_deletekey code, // phase-fair reader writer lock, // librarian page split code, @@ -109,6 +109,28 @@ typedef enum{ BtLockAtomic = 32 } BtLock; +// lite weight mutex + +typedef struct { + union { + struct { + uint xlock:1; // one writer has exclusive lock + uint wrt:31; // count of other writers waiting + } bits[1]; + uint value[1]; + }; +} BtMutexLatch; + +#define XCL 1 +#define WRT 2 + +// mode & definition for lite latch implementation + +enum { + QueRd = 1, // reader queue + QueWr = 2 // writer queue +} RWQueue; + // definition for phase-fair reader/writer lock implementation typedef struct { @@ -123,7 +145,7 @@ typedef struct { // write only lock typedef struct { - volatile uint exclusive[1]; + BtMutexLatch xcl[1]; ushort tid; ushort dup; } WOLock; @@ -133,27 +155,10 @@ typedef struct { #define MASK 0x3 #define RINC 0x4 -// lite weight mutex - -// exclusive is set for write access - -typedef struct { - volatile uint exclusive[1]; -} BtMutexLatch; - -#define XCL 1 - -// mode & definition for lite latch implementation - -enum { - QueRd = 1, // reader queue - QueWr = 2 // writer queue -} RWQueue; - // hash table entries typedef struct { - volatile uint entry; // Latch table entry at head of chain + uint entry; // Latch table entry at head of chain BtMutexLatch latch[1]; } BtHashEntry; @@ -464,33 +469,76 @@ uid bt_newdup (BtDb *bt) #endif } +// lite weight spin lock Latch Manager + +int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) +{ + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +} + +void bt_mutexlock(BtMutexLatch *latch) +{ +BtMutexLatch prev[1]; +uint slept = 0; + + while( 1 ) { + *prev->value = __sync_fetch_and_or(latch->value, XCL); + + if( !prev->bits->xlock ) { // did we set XCL bit? + if( slept ) + __sync_fetch_and_sub(latch->value, WRT); + return; + } + + if( !slept ) { + prev->bits->wrt++; + __sync_fetch_and_add(latch->value, WRT); + } + + sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr); + slept = 1; + } +} + +// try to obtain write lock + +// return 1 if obtained, +// 0 otherwise + +int bt_mutextry(BtMutexLatch *latch) +{ +BtMutexLatch prev[1]; + + *prev->value = __sync_fetch_and_or(latch->value, XCL); + + // take write access if exclusive bit is clear + + return !prev->bits->xlock; +} + +// clear write mode + +void bt_releasemutex(BtMutexLatch *latch) +{ +BtMutexLatch prev[1]; + + *prev->value = __sync_fetch_and_and(latch->value, ~XCL); + + if( prev->bits->wrt ) + sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); +} + // Write-Only Queue Lock void WriteOLock (WOLock *lock, ushort tid) { -uint prev; - if( lock->tid == tid ) { lock->dup++; return; } - while( 1 ) { -#ifdef unix - prev = __sync_fetch_and_or (lock->exclusive, 1); -#else - prev = _InterlockedExchangeOr (lock->exclusive, 1); -#endif - if( !(prev & XCL) ) { - lock->tid = tid; - return; - } -#ifdef unix - sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr ); -#else - SwitchToThread (); -#endif - } + bt_mutexlock(lock->xcl); + lock->tid = tid; } void WriteORelease (WOLock *lock) @@ -500,11 +548,8 @@ void WriteORelease (WOLock *lock) return; } - *lock->exclusive = 0; lock->tid = 0; -#ifdef linux - sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); -#endif + bt_releasemutex(lock->xcl); } // Phase-Fair reader/writer lock implementation @@ -628,64 +673,6 @@ void ReadRelease (RWLock *lock) #endif } -// lite weight spin lock Latch Manager - -int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) -{ - return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); -} - -void bt_mutexlock(BtMutexLatch *latch) -{ -uint prev; - - while( 1 ) { -#ifdef unix - prev = __sync_fetch_and_or(latch->exclusive, XCL); -#else - prev = _InterlockedOr(latch->exclusive, XCL); -#endif - if( !(prev & XCL) ) - return; -#ifdef unix - sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr ); -#else - SwitchToThread(); -#endif - } -} - -// try to obtain write lock - -// return 1 if obtained, -// 0 otherwise - -int bt_mutextry(BtMutexLatch *latch) -{ -uint prev; - -#ifdef unix - prev = __sync_fetch_and_or(latch->exclusive, XCL); -#else - prev = _InterlockedOr(latch->exclusive, XCL); -#endif - // take write access if exclusive bit is clear - - return !(prev & XCL); -} - -// clear write mode - -void bt_releasemutex(BtMutexLatch *latch) -{ - *latch->exclusive = 0; -#ifdef unix - sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr ); -#endif -} - -// recovery manager -- flush dirty pages - void bt_flushlsn (BtDb *bt) { uint cnt3 = 0, cnt2 = 0, cnt = 0; @@ -1327,13 +1314,15 @@ uint slot; fdatasync (mgr->idx); - eof = (BtLogHdr *)mgr->redobuff; - memset (eof, 0, sizeof(BtLogHdr)); - eof->lsn = mgr->lsn; + if( mgr->redopages ) { + eof = (BtLogHdr *)mgr->redobuff; + memset (eof, 0, sizeof(BtLogHdr)); + eof->lsn = mgr->lsn; - pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits); + pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits); - sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); + sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER); + } fprintf(stderr, "%d buffer pool pages flushed\n", num); @@ -1348,11 +1337,13 @@ uint slot; CloseHandle(mgr->hpool); #endif #ifdef unix - free (mgr->redobuff); + if( mgr->redopages ) + free (mgr->redobuff); close (mgr->idx); free (mgr); #else - VirtualFree (mgr->redobuff, 0, MEM_RELEASE); + if( mgr->redopages ) + VirtualFree (mgr->redobuff, 0, MEM_RELEASE); FlushFileBuffers(mgr->idx); CloseHandle(mgr->idx); GlobalFree (mgr); @@ -3571,13 +3562,13 @@ uint entry = 0; if( *latch->access->rin & MASK ) fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no); - if( *latch->parent->exclusive ) + if( *latch->parent->xcl->value ) fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no); - if( *latch->atomic->exclusive ) + if( *latch->atomic->xcl->value ) fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no); - if( *latch->modify->exclusive ) + if( *latch->modify->value ) fprintf(stderr, "latchset %d modifylocked for page %d\n", entry, latch->page_no); if( latch->pin & ~CLOCK_bit ) @@ -3785,7 +3776,7 @@ FILE *in; posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL); #endif fprintf(stderr, "started counting\n"); - page_no = LEAF_page; + next = LEAF_page + bt->mgr->redopages + 1; while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) { if( bt_readpage (bt->mgr, bt->frame, page_no) ) @@ -3795,7 +3786,7 @@ FILE *in; cnt += bt->frame->act; bt->reads++; - page_no++; + page_no = next++; } cnt--; // remove stopper key