]> pd.if.org Git - btree/commitdiff
Replace futex latches with spin latches
authorunknown <karl@E04.petzent.com>
Fri, 10 Oct 2014 21:25:03 +0000 (14:25 -0700)
committerunknown <karl@E04.petzent.com>
Fri, 10 Oct 2014 21:25:03 +0000 (14:25 -0700)
threadskv9c.c

index 6029526f8108525eab3a969e9e0a85253822a323..c13577a7e632bd39f16c1c0e427d798196a0af55 100644 (file)
@@ -1,4 +1,4 @@
-// btree version threadskv9c FUTEX version
+// btree version threadskv9c sched_yield version
 //     with reworked bt_deletekey code,
 //     phase-fair reader writer lock,
 //     librarian page split code,
@@ -123,7 +123,8 @@ typedef struct {
 //     write only lock
 
 typedef struct {
-       volatile uint exclusive[1];
+       volatile ushort ticket[1];
+       volatile ushort serving[1];
        ushort tid;
        ushort dup;
 } WOLock;
@@ -138,7 +139,8 @@ typedef struct {
 // exclusive is set for write access
 
 typedef struct {
-       volatile uint exclusive[1];
+       volatile unsigned char exclusive[1];
+       unsigned char filler;
 } BtMutexLatch;
 
 #define XCL 1
@@ -153,7 +155,7 @@ enum {
 //  hash table entries
 
 typedef struct {
-       volatile uint entry;            // Latch table entry at head of chain
+       uint entry;             // Latch table entry at head of chain
        BtMutexLatch latch[1];
 } BtHashEntry;
 
@@ -468,29 +470,26 @@ uid bt_newdup (BtDb *bt)
 
 void WriteOLock (WOLock *lock, ushort tid)
 {
-uint prev;
+ushort tix;
 
        if( lock->tid == tid ) {
                lock->dup++;
                return;
        }
-
-       while( 1 ) {
 #ifdef unix
-         prev = __sync_fetch_and_or (lock->exclusive, 1);
+       tix = __sync_fetch_and_add (lock->ticket, 1);
 #else
-         prev = _InterlockedExchangeOr (lock->exclusive, 1);
+       tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
 #endif
-         if( !(prev & XCL) ) {
-               lock->tid = tid;
-               return;
-         }
+       // wait for our ticket to come up
+
+       while( tix != lock->serving[0] )
 #ifdef unix
-         sys_futex( (void *)lock->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
+               sched_yield();
 #else
-         SwitchToThread ();
+               SwitchToThread ();
 #endif
-       }
+       lock->tid = tid;
 }
 
 void WriteORelease (WOLock *lock)
@@ -500,11 +499,8 @@ void WriteORelease (WOLock *lock)
                return;
        }
 
-       *lock->exclusive = 0;
        lock->tid = 0;
-#ifdef linux
-       sys_futex( (void *)lock->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
+       lock->serving[0]++;
 }
 
 //     Phase-Fair reader/writer lock implementation
@@ -637,22 +633,21 @@ int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *add
 
 void bt_mutexlock(BtMutexLatch *latch)
 {
-uint prev;
+unsigned char prev;
 
-  while( 1 ) {
+  do {
 #ifdef  unix
        prev = __sync_fetch_and_or(latch->exclusive, XCL);
 #else
-       prev = _InterlockedOr(latch->exclusive, XCL);
+       prev = _InterlockedOr8(latch->exclusive, XCL);
 #endif
        if( !(prev & XCL) )
                return;
 #ifdef  unix
-       sys_futex( (void *)latch->exclusive, FUTEX_WAIT_BITSET, prev, NULL, NULL, QueWr );
+  } while( sched_yield(), 1 );
 #else
-       SwitchToThread();
+  } while( SwitchToThread(), 1 );
 #endif
-  }
 }
 
 //     try to obtain write lock
@@ -667,7 +662,7 @@ uint prev;
 #ifdef  unix
        prev = __sync_fetch_and_or(latch->exclusive, XCL);
 #else
-       prev = _InterlockedOr(latch->exclusive, XCL);
+       prev = _InterlockedOr8(latch->exclusive, XCL);
 #endif
        //      take write access if exclusive bit is clear
 
@@ -679,9 +674,6 @@ uint prev;
 void bt_releasemutex(BtMutexLatch *latch)
 {
        *latch->exclusive = 0;
-#ifdef unix
-       sys_futex( (void *)latch->exclusive, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
-#endif
 }
 
 //     recovery manager -- flush dirty pages
@@ -1327,13 +1319,15 @@ uint slot;
 
        fdatasync (mgr->idx);
 
-       eof = (BtLogHdr *)mgr->redobuff;
-       memset (eof, 0, sizeof(BtLogHdr));
-       eof->lsn = mgr->lsn;
+       if( mgr->redopages ) {
+               eof = (BtLogHdr *)mgr->redobuff;
+               memset (eof, 0, sizeof(BtLogHdr));
+               eof->lsn = mgr->lsn;
 
-       pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
+               pwrite (mgr->idx, mgr->redobuff, sizeof(BtLogHdr), REDO_page << mgr->page_bits);
 
-       sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+               sync_file_range (mgr->idx, REDO_page << mgr->page_bits, sizeof(BtLogHdr), SYNC_FILE_RANGE_WAIT_AFTER);
+       }
 
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
 
@@ -1348,11 +1342,13 @@ uint slot;
        CloseHandle(mgr->hpool);
 #endif
 #ifdef unix
-       free (mgr->redobuff);
+       if( mgr->redopages )
+               free (mgr->redobuff);
        close (mgr->idx);
        free (mgr);
 #else
-       VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
+       if( mgr->redopages )
+               VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
        FlushFileBuffers(mgr->idx);
        CloseHandle(mgr->idx);
        GlobalFree (mgr);
@@ -3571,10 +3567,10 @@ uint entry = 0;
                if( *latch->access->rin & MASK )
                        fprintf(stderr, "latchset %d accesslocked for page %d\n", entry, latch->page_no);
 
-               if( *latch->parent->exclusive )
+               if( *latch->parent->ticket != *latch->parent->serving )
                        fprintf(stderr, "latchset %d parentlocked for page %d\n", entry, latch->page_no);
 
-               if( *latch->atomic->exclusive )
+               if( *latch->atomic->ticket != *latch->atomic->serving )
                        fprintf(stderr, "latchset %d atomiclocked for page %d\n", entry, latch->page_no);
 
                if( *latch->modify->exclusive )
@@ -3785,7 +3781,7 @@ FILE *in;
                posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
 #endif
                fprintf(stderr, "started counting\n");
-               page_no = LEAF_page;
+               next = LEAF_page + bt->mgr->redopages + 1;
 
                while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
                        if( bt_readpage (bt->mgr, bt->frame, page_no) )
@@ -3795,7 +3791,7 @@ FILE *in;
                                cnt += bt->frame->act;
 
                        bt->reads++;
-                       page_no++;
+                       page_no = next++;
                }
                
                cnt--;  // remove stopper key