]> pd.if.org Git - btree/blobdiff - threadskv6.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv6.c
index ce037105c8055c674d3df8ed7c15a4852784914a..9350ca5040fb5511f0a8da5e18a05e8b017faf58 100644 (file)
@@ -145,9 +145,10 @@ typedef struct {
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
        RWLock parent[1];               // Posting of fence key in parent
-       volatile uint next;             // next entry in hash table chain
-       volatile uint prev;             // prev entry in hash table chain
-       volatile ushort pin;    // number of outstanding latches
+       uint slot;                              // entry slot in latch table
+       uint next;                              // next entry in hash table chain
+       uint prev;                              // prev entry in hash table chain
+       volatile ushort pin;    // number of outstanding threads
        ushort dirty:1;                 // page in cache is dirty
 } BtLatchSet;
 
@@ -634,16 +635,17 @@ uint amt[1];
 
 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
 {
-BtPage page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
+BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
 BtLatchSet *latch = bt->mgr->latchsets + slot;
 
        if( latch->next = bt->mgr->hashtable[hashidx].slot )
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
-       latch->pin = CLOCK_bit | 1;     // initialize clock
        latch->page_no = page_no;
+       latch->slot = slot;
        latch->prev = 0;
+       latch->pin = 1;
 
        if( loadit )
          if( bt->err = bt_readpage (bt->mgr, page, page_no) )
@@ -654,22 +656,27 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
        return bt->err = 0;
 }
 
-//     release latch pin
+//     set CLOCK bit in latch
+//     decrement pin count
 
-void bt_unpinlatch (BtLatchSet *set)
+void bt_unpinlatch (BtLatchSet *latch)
 {
 #ifdef unix
-       __sync_fetch_and_add(&set->pin, -1);
+       if( ~latch->pin & CLOCK_bit )
+               __sync_fetch_and_or(&latch->pin, CLOCK_bit);
+       __sync_fetch_and_add(&latch->pin, -1);
 #else
-       _InterlockedDecrement16 (&set->pin);
+       if( ~latch->pin & CLOCK_bit )
+               _InterlockedOr16 (&latch->pin, CLOCK_bit);
+       _InterlockedDecrement16 (&latch->pin);
 #endif
 }
 
-//  map the btree cached page onto current page
+//  return the btree cached page address
 
 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
 {
-BtPage page = (BtPage)((uid)(latch - bt->mgr->latchsets) * bt->mgr->page_size + bt->mgr->pagepool);
+BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
 
        return page;
 }
@@ -705,10 +712,8 @@ BtPage page;
        latch = bt->mgr->latchsets + slot;
 #ifdef unix
        __sync_fetch_and_add(&latch->pin, 1);
-       __sync_fetch_and_or(&latch->pin, CLOCK_bit);
 #else
        _InterlockedIncrement16 (&latch->pin);
-       _InterlockedOr16 (&latch->pin, CLOCK_bit);
 #endif
        bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
        return latch;
@@ -762,19 +767,24 @@ BtPage page;
        if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
                continue;
 
-       if( latch->pin & CLOCK_bit ) {
+       //  skip this slot if it is pinned
+       //      or the CLOCK bit is set
+
+       if( latch->pin ) {
+         if( latch->pin & CLOCK_bit ) {
 #ifdef unix
-               __sync_fetch_and_add(&latch->pin, -CLOCK_bit);
+               __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
 #else
-               _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_bit);
+               _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
 #endif
+         }
          bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
          continue;
        }
 
-       //  update permanent page area in btree
+       //  update permanent page area in btree from buffer pool
 
-       page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool);
+       page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
 
        if( latch->dirty )
          if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
@@ -812,19 +822,20 @@ uint slot;
        //      flush dirty pool pages to the btree
 
        for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
-               page = (BtPage)((uid)slot * mgr->page_size + mgr->pagepool);
+               page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
                latch = mgr->latchsets + slot;
 
                if( latch->dirty ) {
                        bt_writepage(mgr, page, latch->page_no);
                        latch->dirty = 0, num++;
                }
+//             madvise (page, mgr->page_size, MADV_DONTNEED);
        }
 
        fprintf(stderr, "%d buffer pool pages flushed\n", num);
 
 #ifdef unix
-       munmap (mgr->hashtable, mgr->nlatchpage * mgr->page_size);
+       munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
        munmap (mgr->pagezero, mgr->page_size);
 #else
        FlushViewOfFile(mgr->pagezero, 0);
@@ -941,7 +952,7 @@ BtVal *val;
        //  calculate number of latch hash table entries
 
        mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
-       mgr->latchhash = mgr->nlatchpage * mgr->page_size / sizeof(BtHashEntry);
+       mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
 
        mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
        mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
@@ -1003,14 +1014,14 @@ mgrlatch:
        // mlock the pagezero page
 
        flag = PROT_READ | PROT_WRITE;
-       mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
+       mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
        if( mgr->pagezero == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
        }
        mlock (mgr->pagezero, mgr->page_size);
 
-       mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage * mgr->page_size, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
+       mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
        if( mgr->hashtable == MAP_FAILED ) {
                fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
                return bt_mgrclose (mgr), NULL;
@@ -1046,7 +1057,7 @@ mgrlatch:
        }
 #endif
 
-       mgr->pagepool = (unsigned char *)mgr->hashtable + (uid)(mgr->nlatchpage - mgr->latchtotal) * mgr->page_size;
+       mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
        mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
 
        return mgr;
@@ -1739,7 +1750,9 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || page->lvl )
+                if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -1756,11 +1769,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -1874,8 +1885,10 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
+
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
                memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
@@ -1887,11 +1900,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -1960,11 +1971,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
@@ -1995,16 +2004,18 @@ uint prev;
 
        // insert new fence for reformulated left block of smaller keys
 
+       ptr = (BtKey*)fencekey;
        bt_putid (value, set->page_no);
 
-       if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
                return bt->err;
 
        // switch fence for right block of larger keys to new right page
 
+       ptr = (BtKey*)rightkey;
        bt_putid (value, right->page_no);
 
-       if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
+       if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
                return bt->err;
 
        bt_unlockpage (BtLockParent, set->latch);