]> pd.if.org Git - btree/commitdiff
Fix concurrency bug with CLOCK_bit and latch->pin
authorunknown <karl@E04.petzent.com>
Tue, 16 Sep 2014 23:47:39 +0000 (16:47 -0700)
committerunknown <karl@E04.petzent.com>
Tue, 16 Sep 2014 23:47:39 +0000 (16:47 -0700)
threadskv6.c

index ce037105c8055c674d3df8ed7c15a4852784914a..627a3786570d02b2833cd8f438a82f65b3d6257d 100644 (file)
@@ -145,9 +145,10 @@ typedef struct {
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
        RWLock parent[1];               // Posting of fence key in parent
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
        RWLock parent[1];               // Posting of fence key in parent
-       volatile uint next;             // next entry in hash table chain
-       volatile uint prev;             // prev entry in hash table chain
-       volatile ushort pin;    // number of outstanding latches
+       uint slot;                              // entry slot in latch table
+       uint next;                              // next entry in hash table chain
+       uint prev;                              // prev entry in hash table chain
+       volatile ushort pin;    // number of outstanding threads
        ushort dirty:1;                 // page in cache is dirty
 } BtLatchSet;
 
        ushort dirty:1;                 // page in cache is dirty
 } BtLatchSet;
 
@@ -634,16 +635,17 @@ uint amt[1];
 
 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
 {
 
 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
 {
-BtPage page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
+BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
 BtLatchSet *latch = bt->mgr->latchsets + slot;
 
        if( latch->next = bt->mgr->hashtable[hashidx].slot )
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
 BtLatchSet *latch = bt->mgr->latchsets + slot;
 
        if( latch->next = bt->mgr->hashtable[hashidx].slot )
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
-       latch->pin = CLOCK_bit | 1;     // initialize clock
        latch->page_no = page_no;
        latch->page_no = page_no;
+       latch->slot = slot;
        latch->prev = 0;
        latch->prev = 0;
+       latch->pin = 1;
 
        if( loadit )
          if( bt->err = bt_readpage (bt->mgr, page, page_no) )
 
        if( loadit )
          if( bt->err = bt_readpage (bt->mgr, page, page_no) )
@@ -654,22 +656,27 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
        return bt->err = 0;
 }
 
        return bt->err = 0;
 }
 
-//     release latch pin
+//     set CLOCK bit in latch
+//     decrement pin count
 
 
-void bt_unpinlatch (BtLatchSet *set)
+void bt_unpinlatch (BtLatchSet *latch)
 {
 #ifdef unix
 {
 #ifdef unix
-       __sync_fetch_and_add(&set->pin, -1);
+       if( ~latch->pin & CLOCK_bit )
+               __sync_fetch_and_or(&latch->pin, CLOCK_bit);
+       __sync_fetch_and_add(&latch->pin, -1);
 #else
 #else
-       _InterlockedDecrement16 (&set->pin);
+       if( ~latch->pin & CLOCK_bit )
+               _InterlockedOr16 (&latch->pin, CLOCK_bit);
+       _InterlockedDecrement16 (&latch->pin);
 #endif
 }
 
 #endif
 }
 
-//  map the btree cached page onto current page
+//  return the btree cached page address
 
 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
 {
 
 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
 {
-BtPage page = (BtPage)((uid)(latch - bt->mgr->latchsets) * bt->mgr->page_size + bt->mgr->pagepool);
+BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
 
        return page;
 }
 
        return page;
 }
@@ -705,10 +712,8 @@ BtPage page;
        latch = bt->mgr->latchsets + slot;
 #ifdef unix
        __sync_fetch_and_add(&latch->pin, 1);
        latch = bt->mgr->latchsets + slot;
 #ifdef unix
        __sync_fetch_and_add(&latch->pin, 1);
-       __sync_fetch_and_or(&latch->pin, CLOCK_bit);
 #else
        _InterlockedIncrement16 (&latch->pin);
 #else
        _InterlockedIncrement16 (&latch->pin);
-       _InterlockedOr16 (&latch->pin, CLOCK_bit);
 #endif
        bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
        return latch;
 #endif
        bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
        return latch;
@@ -762,19 +767,24 @@ BtPage page;
        if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
                continue;
 
        if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
                continue;
 
-       if( latch->pin & CLOCK_bit ) {
+       //  skip this slot if it is pinned
+       //      or the CLOCK bit is set
+
+       if( latch->pin ) {
+         if( latch->pin & CLOCK_bit ) {
 #ifdef unix
 #ifdef unix
-               __sync_fetch_and_add(&latch->pin, -CLOCK_bit);
+               __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
 #else
 #else
-               _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_bit);
+               _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
 #endif
 #endif
+         }
          bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
          continue;
        }
 
          bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
          continue;
        }
 
-       //  update permanent page area in btree
+       //  update permanent page area in btree from buffer pool
 
 
-       page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
+       page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
 
        if( latch->dirty )
          if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
 
        if( latch->dirty )
          if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
@@ -812,19 +822,20 @@ uint slot;
        //      flush dirty pool pages to the btree
 
        for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
        //      flush dirty pool pages to the btree
 
        for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
-               page = (BtPage)((uid)slot * mgr->page_size + mgr->pagepool);
+               page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
                latch = mgr->latchsets + slot;
 
                if( latch->dirty ) {
                        bt_writepage(mgr, page, latch->page_no);
                        latch->dirty = 0, num++;
                }
                latch = mgr->latchsets + slot;
 
                if( latch->dirty ) {
                        bt_writepage(mgr, page, latch->page_no);
                        latch->dirty = 0, num++;
                }
+//             madvise (page, mgr->page_size, MADV_DONTNEED);
        }
 
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
 
 #ifdef unix
        }
 
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
 
 #ifdef unix
-       munmap (mgr->hashtable, mgr->nlatchpage * mgr->page_size);
+       munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
        munmap (mgr->pagezero, mgr->page_size);
 #else
        FlushViewOfFile(mgr->pagezero, 0);
        munmap (mgr->pagezero, mgr->page_size);
 #else
        FlushViewOfFile(mgr->pagezero, 0);
@@ -941,7 +952,7 @@ BtVal *val;
        //  calculate number of latch hash table entries
 
        mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
        //  calculate number of latch hash table entries
 
        mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
-       mgr->latchhash = mgr->nlatchpage * mgr->page_size / sizeof(BtHashEntry);
+       mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
 
        mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
        mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
 
        mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
        mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
@@ -1003,14 +1014,14 @@ mgrlatch:
        // mlock the pagezero page
 
        flag = PROT_READ | PROT_WRITE;
        // mlock the pagezero page
 
        flag = PROT_READ | PROT_WRITE;
-       mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
+       mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
        if( mgr->pagezero == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
        mlock (mgr->pagezero, mgr->page_size);
 
        if( mgr->pagezero == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
        mlock (mgr->pagezero, mgr->page_size);
 
-       mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage * mgr->page_size, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
+       mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
        if( mgr->hashtable == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        if( mgr->hashtable == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
@@ -1046,7 +1057,7 @@ mgrlatch:
        }
 #endif
 
        }
 #endif
 
-       mgr->pagepool = (unsigned char *)mgr->hashtable + (uid)(mgr->nlatchpage - mgr->latchtotal) * mgr->page_size;
+       mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
        mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
 
        return mgr;
        mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
 
        return mgr;