From 981bb7644f7b96b38a4811a9e6567497c3c69e86 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 13 Feb 2014 11:05:58 -0800 Subject: [PATCH] Fix a couple of concurrency problems --- threads2h.c | 59 +++++++++++++++++++++++++++++++---------------------- threads2i.c | 59 +++++++++++++++++++++++++++++++---------------------- threads2j.c | 59 +++++++++++++++++++++++++++++++---------------------- 3 files changed, 105 insertions(+), 72 deletions(-) diff --git a/threads2h.c b/threads2h.c index bb8c236..11fb8b4 100644 --- a/threads2h.c +++ b/threads2h.c @@ -1630,9 +1630,9 @@ BtPageSet next[1]; int chk; memcpy (oldfence, set->page->fence, 256); + next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); while( !set->page->kill && set->page->lvl ) { - next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); next->latch = bt_pinlatch (bt, next->page_no); bt_lockpage (BtLockParent, next->latch); bt_lockpage (BtLockAccess, next->latch); @@ -1846,10 +1846,24 @@ BtKey ptr; // load and lock our parent -retry: + while( 1 ) { if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) ) return bt->err; + // do we show up in our parent yet? + + if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) { + bt_unlockpage (BtLockWrite, parent->latch); + bt_unpinlatch (parent->latch); + bt_unpinpool (parent->pool); +#ifdef linux + sched_yield(); +#else + SwitchToThread(); +#endif + continue; + } + // can we do a simple merge entirely // between siblings on the parent page? @@ -1903,7 +1917,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // find our left neighbor in our parent page @@ -1967,7 +1981,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // redirect parent to our left sibling @@ -2003,7 +2017,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // delete our left sibling from parent @@ -2061,6 +2075,7 @@ retry: bt_unpinpool (parent->pool); return 0; + } } // find and delete key on page by marking delete flag bit @@ -2122,8 +2137,8 @@ uid bt_findkey (BtDb *bt, unsigned char *key, uint len) { BtPageSet set[1]; uint slot; +uid id = 0; BtKey ptr; -uid id; if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) ) ptr = keyptr(set->page, slot); @@ -2133,10 +2148,9 @@ uid id; // if key exists, return row-id // otherwise return 0 - if( !keycmp (ptr, key, len) ) + if( slot <= set->page->cnt ) + if( !keycmp (ptr, key, len) ) id = bt_getid(slotptr(set->page,slot)->id); - else - id = 0; bt_unlockpage (BtLockRead, set->latch); bt_unpinlatch (set->latch); @@ -2734,21 +2748,18 @@ FILE *in; case 'c': fprintf(stderr, "started counting\n"); - - do { - if( set->pool = bt_pinpool (bt, page_no) ) - set->page = bt_page (bt, set->pool, page_no); - else - break; - set->latch = bt_pinlatch (bt, page_no); - bt_lockpage (BtLockRead, set->latch); - cnt += set->page->act; - next = bt_getid (set->page->right); - bt_unlockpage (BtLockRead, set->latch); - bt_unpinlatch (set->latch); - bt_unpinpool (set->pool); - } while( page_no = next ); - + next = bt->mgr->latchmgr->nlatchpage + LATCH_page; + page_no = LEAF_page; + + while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) { + pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits); + if( !bt->frame->free && !bt->frame->lvl ) + cnt += bt->frame->act; + if( page_no > LEAF_page ) + next = page_no + 1; + page_no = next; + } + cnt--; // remove stopper key fprintf(stderr, " Total keys read %d\n", cnt); break; diff --git a/threads2i.c b/threads2i.c index 5f00379..309ecaf 100644 --- a/threads2i.c +++ b/threads2i.c @@ -1545,9 +1545,9 @@ BtPageSet next[1]; int chk; memcpy (oldfence, set->page->fence, 256); + next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); while( !set->page->kill && set->page->lvl ) { - next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); next->latch = bt_pinlatch (bt, next->page_no); bt_lockpage (BtLockParent, next->latch); bt_lockpage (BtLockAccess, next->latch); @@ -1761,10 +1761,24 @@ BtKey ptr; // load and lock our parent -retry: + while( 1 ) { if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) ) return bt->err; + // do we show up in our parent yet? + + if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) { + bt_unlockpage (BtLockWrite, parent->latch); + bt_unpinlatch (parent->latch); + bt_unpinpool (parent->pool); +#ifdef linux + sched_yield(); +#else + SwitchToThread(); +#endif + continue; + } + // can we do a simple merge entirely // between siblings on the parent page? @@ -1818,7 +1832,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // find our left neighbor in our parent page @@ -1882,7 +1896,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // redirect parent to our left sibling @@ -1918,7 +1932,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // delete our left sibling from parent @@ -1976,6 +1990,7 @@ retry: bt_unpinpool (parent->pool); return 0; + } } // find and delete key on page by marking delete flag bit @@ -2037,8 +2052,8 @@ uid bt_findkey (BtDb *bt, unsigned char *key, uint len) { BtPageSet set[1]; uint slot; +uid id = 0; BtKey ptr; -uid id; if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) ) ptr = keyptr(set->page, slot); @@ -2048,10 +2063,9 @@ uid id; // if key exists, return row-id // otherwise return 0 - if( !keycmp (ptr, key, len) ) + if( slot <= set->page->cnt ) + if( !keycmp (ptr, key, len) ) id = bt_getid(slotptr(set->page,slot)->id); - else - id = 0; bt_unlockpage (BtLockRead, set->latch); bt_unpinlatch (set->latch); @@ -2668,21 +2682,18 @@ FILE *in; case 'c': fprintf(stderr, "started counting\n"); - - do { - if( set->pool = bt_pinpool (bt, page_no) ) - set->page = bt_page (bt, set->pool, page_no); - else - break; - set->latch = bt_pinlatch (bt, page_no); - bt_lockpage (BtLockRead, set->latch); - cnt += set->page->act; - next = bt_getid (set->page->right); - bt_unlockpage (BtLockRead, set->latch); - bt_unpinlatch (set->latch); - bt_unpinpool (set->pool); - } while( page_no = next ); - + next = bt->mgr->latchmgr->nlatchpage + LATCH_page; + page_no = LEAF_page; + + while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) { + pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits); + if( !bt->frame->free && !bt->frame->lvl ) + cnt += bt->frame->act; + if( page_no > LEAF_page ) + next = page_no + 1; + page_no = next; + } + cnt--; // remove stopper key fprintf(stderr, " Total keys read %d\n", cnt); break; diff --git a/threads2j.c b/threads2j.c index 70052ee..e2faef4 100644 --- a/threads2j.c +++ b/threads2j.c @@ -1580,9 +1580,9 @@ BtPageSet next[1]; int chk; memcpy (oldfence, set->page->fence, 256); + next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); while( !set->page->kill && set->page->lvl ) { - next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id); next->latch = bt_pinlatch (bt, next->page_no); bt_lockpage (BtLockParent, next->latch); bt_lockpage (BtLockAccess, next->latch); @@ -1796,10 +1796,24 @@ BtKey ptr; // load and lock our parent -retry: + while( 1 ) { if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) ) return bt->err; + // do we show up in our parent yet? + + if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) { + bt_unlockpage (BtLockWrite, parent->latch); + bt_unpinlatch (parent->latch); + bt_unpinpool (parent->pool); +#ifdef linux + sched_yield(); +#else + SwitchToThread(); +#endif + continue; + } + // can we do a simple merge entirely // between siblings on the parent page? @@ -1853,7 +1867,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // find our left neighbor in our parent page @@ -1917,7 +1931,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // redirect parent to our left sibling @@ -1953,7 +1967,7 @@ retry: #else SwitchToThread(); #endif - goto retry; + continue; } // delete our left sibling from parent @@ -2011,6 +2025,7 @@ retry: bt_unpinpool (parent->pool); return 0; + } } // find and delete key on page by marking delete flag bit @@ -2072,8 +2087,8 @@ uid bt_findkey (BtDb *bt, unsigned char *key, uint len) { BtPageSet set[1]; uint slot; +uid id = 0; BtKey ptr; -uid id; if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) ) ptr = keyptr(set->page, slot); @@ -2083,10 +2098,9 @@ uid id; // if key exists, return row-id // otherwise return 0 - if( !keycmp (ptr, key, len) ) + if( slot <= set->page->cnt ) + if( !keycmp (ptr, key, len) ) id = bt_getid(slotptr(set->page,slot)->id); - else - id = 0; bt_unlockpage (BtLockRead, set->latch); bt_unpinlatch (set->latch); @@ -2703,21 +2717,18 @@ FILE *in; case 'c': fprintf(stderr, "started counting\n"); - - do { - if( set->pool = bt_pinpool (bt, page_no) ) - set->page = bt_page (bt, set->pool, page_no); - else - break; - set->latch = bt_pinlatch (bt, page_no); - bt_lockpage (BtLockRead, set->latch); - cnt += set->page->act; - next = bt_getid (set->page->right); - bt_unlockpage (BtLockRead, set->latch); - bt_unpinlatch (set->latch); - bt_unpinpool (set->pool); - } while( page_no = next ); - + next = bt->mgr->latchmgr->nlatchpage + LATCH_page; + page_no = LEAF_page; + + while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) { + pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits); + if( !bt->frame->free && !bt->frame->lvl ) + cnt += bt->frame->act; + if( page_no > LEAF_page ) + next = page_no + 1; + page_no = next; + } + cnt--; // remove stopper key fprintf(stderr, " Total keys read %d\n", cnt); break; -- 2.40.0