X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv6.c;h=9350ca5040fb5511f0a8da5e18a05e8b017faf58;hp=ce037105c8055c674d3df8ed7c15a4852784914a;hb=HEAD;hpb=909f9ba60c19f17a2388e1308e9ec437391f5ecb diff --git a/threadskv6.c b/threadskv6.c index ce03710..9350ca5 100644 --- a/threadskv6.c +++ b/threadskv6.c @@ -145,9 +145,10 @@ typedef struct { RWLock readwr[1]; // read/write page lock RWLock access[1]; // Access Intent/Page delete RWLock parent[1]; // Posting of fence key in parent - volatile uint next; // next entry in hash table chain - volatile uint prev; // prev entry in hash table chain - volatile ushort pin; // number of outstanding latches + uint slot; // entry slot in latch table + uint next; // next entry in hash table chain + uint prev; // prev entry in hash table chain + volatile ushort pin; // number of outstanding threads ushort dirty:1; // page in cache is dirty } BtLatchSet; @@ -634,16 +635,17 @@ uint amt[1]; BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit) { -BtPage page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool); +BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool); BtLatchSet *latch = bt->mgr->latchsets + slot; if( latch->next = bt->mgr->hashtable[hashidx].slot ) bt->mgr->latchsets[latch->next].prev = slot; bt->mgr->hashtable[hashidx].slot = slot; - latch->pin = CLOCK_bit | 1; // initialize clock latch->page_no = page_no; + latch->slot = slot; latch->prev = 0; + latch->pin = 1; if( loadit ) if( bt->err = bt_readpage (bt->mgr, page, page_no) ) @@ -654,22 +656,27 @@ BtLatchSet *latch = bt->mgr->latchsets + slot; return bt->err = 0; } -// release latch pin +// set CLOCK bit in latch +// decrement pin count -void bt_unpinlatch (BtLatchSet *set) +void bt_unpinlatch (BtLatchSet *latch) { #ifdef unix - __sync_fetch_and_add(&set->pin, -1); + if( ~latch->pin & CLOCK_bit ) + __sync_fetch_and_or(&latch->pin, CLOCK_bit); + __sync_fetch_and_add(&latch->pin, -1); #else - _InterlockedDecrement16 (&set->pin); + if( ~latch->pin & CLOCK_bit ) + _InterlockedOr16 (&latch->pin, CLOCK_bit); + _InterlockedDecrement16 (&latch->pin); #endif } -// map the btree cached page onto current page +// return the btree cached page address BtPage bt_mappage (BtDb *bt, BtLatchSet *latch) { -BtPage page = (BtPage)((uid)(latch - bt->mgr->latchsets) * bt->mgr->page_size + bt->mgr->pagepool); +BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool); return page; } @@ -705,10 +712,8 @@ BtPage page; latch = bt->mgr->latchsets + slot; #ifdef unix __sync_fetch_and_add(&latch->pin, 1); - __sync_fetch_and_or(&latch->pin, CLOCK_bit); #else _InterlockedIncrement16 (&latch->pin); - _InterlockedOr16 (&latch->pin, CLOCK_bit); #endif bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch); return latch; @@ -762,19 +767,24 @@ BtPage page; if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) ) continue; - if( latch->pin & CLOCK_bit ) { + // skip this slot if it is pinned + // or the CLOCK bit is set + + if( latch->pin ) { + if( latch->pin & CLOCK_bit ) { #ifdef unix - __sync_fetch_and_add(&latch->pin, -CLOCK_bit); + __sync_fetch_and_and(&latch->pin, ~CLOCK_bit); #else - _InterlockedExchangeAdd16 (&latch->pin, -CLOCK_bit); + _InterlockedAnd16 (&latch->pin, ~CLOCK_bit); #endif + } bt_spinreleasewrite (bt->mgr->hashtable[idx].latch); continue; } - // update permanent page area in btree + // update permanent page area in btree from buffer pool - page = (BtPage)((uid)slot * bt->mgr->page_size + bt->mgr->pagepool); + page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool); if( latch->dirty ) if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) ) @@ -812,19 +822,20 @@ uint slot; // flush dirty pool pages to the btree for( slot = 1; slot <= mgr->latchdeployed; slot++ ) { - page = (BtPage)((uid)slot * mgr->page_size + mgr->pagepool); + page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool); latch = mgr->latchsets + slot; if( latch->dirty ) { bt_writepage(mgr, page, latch->page_no); latch->dirty = 0, num++; } +// madvise (page, mgr->page_size, MADV_DONTNEED); } fprintf(stderr, "%d buffer pool pages flushed\n", num); #ifdef unix - munmap (mgr->hashtable, mgr->nlatchpage * mgr->page_size); + munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits); munmap (mgr->pagezero, mgr->page_size); #else FlushViewOfFile(mgr->pagezero, 0); @@ -941,7 +952,7 @@ BtVal *val; // calculate number of latch hash table entries mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size; - mgr->latchhash = mgr->nlatchpage * mgr->page_size / sizeof(BtHashEntry); + mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry); mgr->nlatchpage += nodemax; // size of the buffer pool in pages mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size; @@ -1003,14 +1014,14 @@ mgrlatch: // mlock the pagezero page flag = PROT_READ | PROT_WRITE; - mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size); + mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits); if( mgr->pagezero == MAP_FAILED ) { fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno); return bt_mgrclose (mgr), NULL; } mlock (mgr->pagezero, mgr->page_size); - mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage * mgr->page_size, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0); + mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0); if( mgr->hashtable == MAP_FAILED ) { fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno); return bt_mgrclose (mgr), NULL; @@ -1046,7 +1057,7 @@ mgrlatch: } #endif - mgr->pagepool = (unsigned char *)mgr->hashtable + (uid)(mgr->nlatchpage - mgr->latchtotal) * mgr->page_size; + mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits); mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet)); return mgr; @@ -1739,7 +1750,9 @@ BtVal *val; while( cnt++ < max ) { if( cnt == slot ) newslot = idx + 2; - if( cnt < max && slotptr(bt->frame,cnt)->dead ) + + if( cnt < max || page->lvl ) + if( slotptr(bt->frame,cnt)->dead ) continue; // copy the value across @@ -1756,11 +1769,9 @@ BtVal *val; // make a librarian slot - if( idx ) { - slotptr(page, ++idx)->off = nxt; - slotptr(page, idx)->type = Librarian; - slotptr(page, idx)->dead = 1; - } + slotptr(page, ++idx)->off = nxt; + slotptr(page, idx)->type = Librarian; + slotptr(page, idx)->dead = 1; // set up the slot @@ -1874,8 +1885,10 @@ uint prev; idx = 0; while( cnt++ < max ) { - if( slotptr(set->page, cnt)->dead && cnt < max ) + if( cnt < max || set->page->lvl ) + if( slotptr(set->page, cnt)->dead ) continue; + src = valptr(set->page, cnt); nxt -= src->len + sizeof(BtVal); memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal)); @@ -1887,11 +1900,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(bt->frame, ++idx)->off = nxt; - slotptr(bt->frame, idx)->type = Librarian; - slotptr(bt->frame, idx)->dead = 1; - } + slotptr(bt->frame, ++idx)->off = nxt; + slotptr(bt->frame, idx)->type = Librarian; + slotptr(bt->frame, idx)->dead = 1; // add actual slot @@ -1960,11 +1971,9 @@ uint prev; // add librarian slot - if( idx ) { - slotptr(set->page, ++idx)->off = nxt; - slotptr(set->page, idx)->type = Librarian; - slotptr(set->page, idx)->dead = 1; - } + slotptr(set->page, ++idx)->off = nxt; + slotptr(set->page, idx)->type = Librarian; + slotptr(set->page, idx)->dead = 1; // add actual slot @@ -1995,16 +2004,18 @@ uint prev; // insert new fence for reformulated left block of smaller keys + ptr = (BtKey*)fencekey; bt_putid (value, set->page_no); - if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) ) + if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) ) return bt->err; // switch fence for right block of larger keys to new right page + ptr = (BtKey*)rightkey; bt_putid (value, right->page_no); - if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) ) + if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) ) return bt->err; bt_unlockpage (BtLockParent, set->latch);