]> pd.if.org Git - btree/commitdiff
Fix Librarian bugs in bt_cleanpage and bt_splitpage
authorunknown <karl@E04.petzent.com>
Mon, 29 Sep 2014 22:26:57 +0000 (15:26 -0700)
committerunknown <karl@E04.petzent.com>
Mon, 29 Sep 2014 22:26:57 +0000 (15:26 -0700)
threadskv5.c
threadskv6.c
threadskv7.c
threadskv8.c

index acda33bb7644896777e2f0a62b9aeb0914e6b502..ec46ace3a3f7738d261fe9587ecf1ae7df7940d9 100644 (file)
@@ -2007,7 +2007,9 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || page->lvl )
+                 if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -2024,11 +2026,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -2144,7 +2144,8 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
@@ -2156,11 +2157,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -2229,11 +2228,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
index be7dad672f352ed1cb495d27f3f9fdfa73811225..9350ca5040fb5511f0a8da5e18a05e8b017faf58 100644 (file)
@@ -1750,7 +1750,9 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || page->lvl )
+                if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -1767,11 +1769,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -1885,8 +1885,10 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
+
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
                memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
@@ -1898,11 +1900,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -1971,11 +1971,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
index 36fd43f7aebdaf070f9eb884c4649da5919238e9..6c74edbbf33f38f9f3a7418e2e3504b32de92e5b 100644 (file)
@@ -1790,7 +1790,8 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+               if( cnt < max || page->lvl )
+                 if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -1807,11 +1808,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -1925,8 +1924,10 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead && cnt < max )
                        continue;
+
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
                memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
@@ -1938,11 +1939,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -2011,11 +2010,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
index 44411881f67455309eede48bb81ef3ae46e81d77..6b377106ad1666078fbfa663a520df47ea0b9dc4 100644 (file)
@@ -1777,7 +1777,9 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || bt->frame->lvl )
+                 if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -1794,11 +1796,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -1918,8 +1918,10 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
+
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
                memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
@@ -1931,11 +1933,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -1990,11 +1990,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
@@ -2264,7 +2262,7 @@ typedef struct {
        uint entry;                     // latch table entry number
        uint slot:31;           // page slot number
        uint reuse:1;           // reused previous page
-} AtomicMod;
+} AtomicTxn;
 
 typedef struct {
        uid page_no;            // page number for split leaf
@@ -2351,7 +2349,7 @@ uint slot;
 //     determine actual page where key is located
 //  return slot number
 
-uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
+uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
 {
 BtKey *key = keyptr(source,src);
 uint slot = locks[src].slot;
@@ -2368,15 +2366,18 @@ uint entry;
                return slot;
        }
 
-       //      is locks->reuse set?
-       //      if so, find where our key
-       //      is located on previous page or split pages
+       //      is locks->reuse set? or was slot zeroed?
+       //      if so, find where our key is located 
+       //      on current page or pages split on
+       //      same page txn operations.
 
        do {
                set->latch = bt->mgr->latchsets + entry;
                set->page = bt_mappage (bt, set->latch);
 
                if( slot = bt_findslot(set->page, key->key, key->len) ) {
+                 if( slotptr(set->page, slot)->type == Librarian )
+                       slot++;
                  if( locks[src].reuse )
                        locks[src].entry = entry;
                  return slot;
@@ -2387,17 +2388,17 @@ uint entry;
        return 0;
 }
 
-BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
 BtLatchSet *latch;
 BtPageSet set[1];
-uint entry;
+uint entry, slot;
 
-  while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
-       if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
-         return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
+  while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
+       if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
+         return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
 
        if( entry = bt_splitpage (bt, set) )
          latch = bt->mgr->latchsets + entry;
@@ -2407,15 +2408,16 @@ uint entry;
        //      splice right page into split chain
        //      and WriteLock it.
 
+       bt_lockpage(bt, BtLockWrite, latch);
        latch->split = set->latch->split;
        set->latch->split = entry;
-       bt_lockpage(bt, BtLockWrite, latch);
+       locks[src].slot = 0;
   }
 
   return bt->err = BTERR_atomic;
 }
 
-BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 uint idx, entry, slot;
@@ -2424,16 +2426,23 @@ BtKey *ptr;
 BtVal *val;
 
        if( slot = bt_atomicpage (bt, source, locks, src, set) )
+         ptr = keyptr(set->page, slot);
+       else
+         return bt->err = BTERR_struct;
+
+       if( !keycmp (ptr, key->key, key->len) )
+         if( !slotptr(set->page, slot)->dead )
                slotptr(set->page, slot)->dead = 1;
+         else
+               return 0;
        else
-               return bt->err = BTERR_struct;
+               return 0;
 
-       ptr = keyptr(set->page, slot);
        val = valptr(set->page, slot);
-
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
        set->latch->dirty = 1;
        set->page->act--;
+       bt->found++;
        return 0;
 }
 
@@ -2536,7 +2545,7 @@ BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtKey *key, *ptr, *key2;
 BtLatchSet *latch;
-AtomicMod *locks;
+AtomicTxn *locks;
 int result = 0;
 BtSlot temp[1];
 BtPage page;
@@ -2544,7 +2553,7 @@ BtVal *val;
 uid right;
 int type;
 
-  locks = calloc (source->cnt + 1, sizeof(AtomicMod));
+  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
   head = NULL;
   tail = NULL;
 
@@ -3287,7 +3296,7 @@ FILE *in;
                        }
                        else if( len < BT_maxkey )
                                key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
+               fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
                break;
 
        case 'w':