]> pd.if.org Git - btree/commitdiff
Fix a couple of concurrency problems
authorunknown <karl@E04.petzent.com>
Thu, 13 Feb 2014 19:05:58 +0000 (11:05 -0800)
committerunknown <karl@E04.petzent.com>
Thu, 13 Feb 2014 19:05:58 +0000 (11:05 -0800)
threads2h.c
threads2i.c
threads2j.c

index bb8c2363c850d32f4883bfe1d181a10b9abd7d76..11fb8b457962a71e3ef6eeb75af24220f1295191 100644 (file)
@@ -1630,9 +1630,9 @@ BtPageSet next[1];
 int chk;
 
   memcpy (oldfence, set->page->fence, 256);
+  next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
 
   while( !set->page->kill && set->page->lvl ) {
-       next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
        next->latch = bt_pinlatch (bt, next->page_no);
        bt_lockpage (BtLockParent, next->latch);
        bt_lockpage (BtLockAccess, next->latch);
@@ -1846,10 +1846,24 @@ BtKey ptr;
 
        //      load and lock our parent
 
-retry:
+  while( 1 ) {
        if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
+       //      do we show up in our parent yet?
+
+       if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+
        //      can we do a simple merge entirely
        //      between siblings on the parent page?
 
@@ -1903,7 +1917,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        //  find our left neighbor in our parent page
@@ -1967,7 +1981,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        // redirect parent to our left sibling
@@ -2003,7 +2017,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        //      delete our left sibling from parent
@@ -2061,6 +2075,7 @@ retry:
        bt_unpinpool (parent->pool);
 
        return 0;
+  }
 }
 
 //  find and delete key on page by marking delete flag bit
@@ -2122,8 +2137,8 @@ uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
 {
 BtPageSet set[1];
 uint  slot;
+uid id = 0;
 BtKey ptr;
-uid id;
 
        if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
                ptr = keyptr(set->page, slot);
@@ -2133,10 +2148,9 @@ uid id;
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( !keycmp (ptr, key, len) )
+       if( slot <= set->page->cnt )
+         if( !keycmp (ptr, key, len) )
                id = bt_getid(slotptr(set->page,slot)->id);
-       else
-               id = 0;
 
        bt_unlockpage (BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
@@ -2734,21 +2748,18 @@ FILE *in;
 
        case 'c':
                fprintf(stderr, "started counting\n");
-
-               do {
-                       if( set->pool = bt_pinpool (bt, page_no) )
-                               set->page = bt_page (bt, set->pool, page_no);
-                       else
-                               break;
-                       set->latch = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, set->latch);
-                       cnt += set->page->act;
-                       next = bt_getid (set->page->right);
-                       bt_unlockpage (BtLockRead, set->latch);
-                       bt_unpinlatch (set->latch);
-                       bt_unpinpool (set->pool);
-               } while( page_no = next );
-
+               next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
+               page_no = LEAF_page;
+
+               while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
+                       pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits);
+                       if( !bt->frame->free && !bt->frame->lvl )
+                               cnt += bt->frame->act;
+                       if( page_no > LEAF_page )
+                               next = page_no + 1;
+                       page_no = next;
+               }
+               
                cnt--;  // remove stopper key
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
index 5f003793424653ad07bc9ee6bf743969f80c2a65..309ecaf8a505e0b73b604529ab7a92ed2326bd7f 100644 (file)
@@ -1545,9 +1545,9 @@ BtPageSet next[1];
 int chk;
 
   memcpy (oldfence, set->page->fence, 256);
+  next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
 
   while( !set->page->kill && set->page->lvl ) {
-       next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
        next->latch = bt_pinlatch (bt, next->page_no);
        bt_lockpage (BtLockParent, next->latch);
        bt_lockpage (BtLockAccess, next->latch);
@@ -1761,10 +1761,24 @@ BtKey ptr;
 
        //      load and lock our parent
 
-retry:
+  while( 1 ) {
        if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
+       //      do we show up in our parent yet?
+
+       if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+
        //      can we do a simple merge entirely
        //      between siblings on the parent page?
 
@@ -1818,7 +1832,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        //  find our left neighbor in our parent page
@@ -1882,7 +1896,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        // redirect parent to our left sibling
@@ -1918,7 +1932,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        //      delete our left sibling from parent
@@ -1976,6 +1990,7 @@ retry:
        bt_unpinpool (parent->pool);
 
        return 0;
+  }
 }
 
 //  find and delete key on page by marking delete flag bit
@@ -2037,8 +2052,8 @@ uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
 {
 BtPageSet set[1];
 uint  slot;
+uid id = 0;
 BtKey ptr;
-uid id;
 
        if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
                ptr = keyptr(set->page, slot);
@@ -2048,10 +2063,9 @@ uid id;
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( !keycmp (ptr, key, len) )
+       if( slot <= set->page->cnt )
+         if( !keycmp (ptr, key, len) )
                id = bt_getid(slotptr(set->page,slot)->id);
-       else
-               id = 0;
 
        bt_unlockpage (BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
@@ -2668,21 +2682,18 @@ FILE *in;
 
        case 'c':
                fprintf(stderr, "started counting\n");
-
-               do {
-                       if( set->pool = bt_pinpool (bt, page_no) )
-                               set->page = bt_page (bt, set->pool, page_no);
-                       else
-                               break;
-                       set->latch = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, set->latch);
-                       cnt += set->page->act;
-                       next = bt_getid (set->page->right);
-                       bt_unlockpage (BtLockRead, set->latch);
-                       bt_unpinlatch (set->latch);
-                       bt_unpinpool (set->pool);
-               } while( page_no = next );
-
+               next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
+               page_no = LEAF_page;
+
+               while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
+                       pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits);
+                       if( !bt->frame->free && !bt->frame->lvl )
+                               cnt += bt->frame->act;
+                       if( page_no > LEAF_page )
+                               next = page_no + 1;
+                       page_no = next;
+               }
+               
                cnt--;  // remove stopper key
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;
index 70052ee699622a94004df5f291af933b3b6cc4de..e2faef4f45d64020b1ec629fe66cdaa15913be28 100644 (file)
@@ -1580,9 +1580,9 @@ BtPageSet next[1];
 int chk;
 
   memcpy (oldfence, set->page->fence, 256);
+  next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
 
   while( !set->page->kill && set->page->lvl ) {
-       next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
        next->latch = bt_pinlatch (bt, next->page_no);
        bt_lockpage (BtLockParent, next->latch);
        bt_lockpage (BtLockAccess, next->latch);
@@ -1796,10 +1796,24 @@ BtKey ptr;
 
        //      load and lock our parent
 
-retry:
+  while( 1 ) {
        if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
                return bt->err;
 
+       //      do we show up in our parent yet?
+
+       if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
+               bt_unlockpage (BtLockWrite, parent->latch);
+               bt_unpinlatch (parent->latch);
+               bt_unpinpool (parent->pool);
+#ifdef linux
+               sched_yield();
+#else
+               SwitchToThread();
+#endif
+               continue;
+       }
+
        //      can we do a simple merge entirely
        //      between siblings on the parent page?
 
@@ -1853,7 +1867,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        //  find our left neighbor in our parent page
@@ -1917,7 +1931,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        // redirect parent to our left sibling
@@ -1953,7 +1967,7 @@ retry:
 #else
                SwitchToThread();
 #endif
-               goto retry;
+               continue;
        }
 
        //      delete our left sibling from parent
@@ -2011,6 +2025,7 @@ retry:
        bt_unpinpool (parent->pool);
 
        return 0;
+  }
 }
 
 //  find and delete key on page by marking delete flag bit
@@ -2072,8 +2087,8 @@ uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
 {
 BtPageSet set[1];
 uint  slot;
+uid id = 0;
 BtKey ptr;
-uid id;
 
        if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
                ptr = keyptr(set->page, slot);
@@ -2083,10 +2098,9 @@ uid id;
        // if key exists, return row-id
        //      otherwise return 0
 
-       if( !keycmp (ptr, key, len) )
+       if( slot <= set->page->cnt )
+         if( !keycmp (ptr, key, len) )
                id = bt_getid(slotptr(set->page,slot)->id);
-       else
-               id = 0;
 
        bt_unlockpage (BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
@@ -2703,21 +2717,18 @@ FILE *in;
 
        case 'c':
                fprintf(stderr, "started counting\n");
-
-               do {
-                       if( set->pool = bt_pinpool (bt, page_no) )
-                               set->page = bt_page (bt, set->pool, page_no);
-                       else
-                               break;
-                       set->latch = bt_pinlatch (bt, page_no);
-                       bt_lockpage (BtLockRead, set->latch);
-                       cnt += set->page->act;
-                       next = bt_getid (set->page->right);
-                       bt_unlockpage (BtLockRead, set->latch);
-                       bt_unpinlatch (set->latch);
-                       bt_unpinpool (set->pool);
-               } while( page_no = next );
-
+               next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
+               page_no = LEAF_page;
+
+               while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
+                       pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits);
+                       if( !bt->frame->free && !bt->frame->lvl )
+                               cnt += bt->frame->act;
+                       if( page_no > LEAF_page )
+                               next = page_no + 1;
+                       page_no = next;
+               }
+               
                cnt--;  // remove stopper key
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;