]> pd.if.org Git - btree/blobdiff - threadskv5.c
Fix concurrency bug with CLOCK_bit and latch->pin
[btree] / threadskv5.c
index b5df26d617fe1e242a86a87a593163ae94878553..544bb65de488065dc9310526cbfa7ac3d0543fc1 100644 (file)
@@ -905,6 +905,8 @@ SYSTEM_INFO sysinfo[1];
                        return free(mgr), free(latchmgr), NULL;
        } else if( mode == BT_ro )
                return free(latchmgr), bt_mgrclose (mgr), NULL;
+       else
+               initit = 1;
 #else
        latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
        size = GetFileSize(mgr->idx, amt);
@@ -1205,7 +1207,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-//     madvise (page, bt->mgr->page_size, MADV_WILLNEED);
        return page;
 }
 
@@ -1393,6 +1394,7 @@ void bt_unlockpage(BtLock mode, BtLatchSet *set)
 }
 
 //     allocate a new page
+//     return with page pinned.
 
 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
 {
@@ -1707,6 +1709,7 @@ BtPageSet right2[1];
        bt_putid(right2->page->left, left_page_no);
        bt_unlockpage (BtLockWrite, right2->latch);
        bt_unpinlatch (right2->latch);
+       bt_unpinpool (right2->pool);
        return 0;
 }
 
@@ -1716,8 +1719,8 @@ BtPageSet right2[1];
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
 {
 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
-BtPageSet set[1], right[1], right2[1];
 uint slot, idx, found, fence;
+BtPageSet set[1], right[1];
 unsigned char value[BtId];
 BtKey *ptr, *tst;
 BtVal *val;
@@ -1935,6 +1938,12 @@ BtVal *val;
        if( slotptr(set->page, slot)->type == Duplicate )
                len -= BtId;
 
+       //      not there if we reach the stopper key
+
+       if( slot == set->page->cnt )
+         if( !bt_getid (set->page->right) )
+               break;
+
        // if key exists, return >= 0 value bytes copied
        //      otherwise return (-1)
 
@@ -2005,8 +2014,7 @@ BtVal *val;
 
                val = valptr(bt->frame, cnt);
                nxt -= val->len + sizeof(BtVal);
-               ((unsigned char *)page)[nxt] = val->len;
-               memcpy ((unsigned char *)page + nxt + sizeof(BtVal), val->value, val->len);
+               memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
 
                // copy the key across
 
@@ -2140,14 +2148,11 @@ uint prev;
                        continue;
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
-               val = (BtVal*)((unsigned char *)bt->frame + nxt);
-               memcpy (val->value, src->value, src->len);
-               val->len = src->len;
+               memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
 
                key = keyptr(set->page, cnt);
                nxt -= key->len + sizeof(BtKey);
-               ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
-               memcpy (ptr, key, key->len + sizeof(BtKey));
+               memcpy ((unsigned char *)bt->frame + nxt, key, key->len + sizeof(BtVal));
 
                //      add librarian slot
 
@@ -2194,7 +2199,6 @@ uint prev;
                return bt->err;
 
        memcpy (right->page, bt->frame, bt->mgr->page_size);
-       bt_unpinpool (right->pool);
 
        //      update lower keys to continue in old page
 
@@ -2215,10 +2219,9 @@ uint prev;
        while( cnt++ < max ) {
                if( slotptr(bt->frame, cnt)->dead )
                        continue;
-               val = valptr(bt->frame, cnt);
-               nxt -= val->len + sizeof(BtVal);
-               ((unsigned char *)set->page)[nxt] = val->len;
-               memcpy ((unsigned char *)set->page + nxt + sizeof(BtVal), val->value, val->len);
+               src = valptr(bt->frame, cnt);
+               nxt -= src->len + sizeof(BtVal);
+               memcpy ((unsigned char *)set->page + nxt, src, src->len + sizeof(BtVal));
 
                key = keyptr(bt->frame, cnt);
                nxt -= key->len + sizeof(BtKey);
@@ -2280,6 +2283,7 @@ uint prev;
 
        bt_unlockpage (BtLockParent, right->latch);
        bt_unpinlatch (right->latch);
+       bt_unpinpool (right->pool);
        return 0;
 }
 
@@ -2666,6 +2670,18 @@ struct timeval tv[1];
 }
 #endif
 
+void bt_poolaudit (BtMgr *mgr)
+{
+uint slot = 0;
+BtPool *pool;
+
+       while( slot++ < mgr->poolcnt ) {
+               pool = mgr->pool + slot;
+               if( pool->pin & ~CLOCK_bit )
+                 fprintf(stderr, "pool slot %d pinned\n", slot);
+       }
+}
+
 uint bt_latchaudit (BtDb *bt)
 {
 ushort idx, hashidx;
@@ -2738,7 +2754,7 @@ BtKey *ptr;
                if( !bt->frame->free ) {
                 for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
                  ptr = keyptr(bt->frame, idx+1);
-                 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
+                 if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) > 0 )
                        fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
                 }
                 if( !bt->frame->lvl )
@@ -2855,6 +2871,7 @@ FILE *in;
                        }
                        else if( len < BT_maxkey )
                                key[len++] = ch;
+
                fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
                break;
 
@@ -2889,6 +2906,7 @@ FILE *in;
                                set->page = bt_page (bt, set->pool, page_no);
                        else
                                break;
+                       madvise (set->page, bt->mgr->page_size, MADV_WILLNEED);
                        set->latch = bt_pinlatch (bt, page_no);
                        bt_lockpage (BtLockRead, set->latch);
                        next = bt_getid (set->page->right);
@@ -3086,6 +3104,7 @@ BtDb *bt;
        elapsed = getCpuTime(2);
        fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
 
+       bt_poolaudit (mgr);
        bt_mgrclose (mgr);
 }