]> pd.if.org Git - btree/commitdiff
Fix a few bugs
authorunknown <karl@E04.petzent.com>
Mon, 8 Sep 2014 21:56:18 +0000 (14:56 -0700)
committerunknown <karl@E04.petzent.com>
Mon, 8 Sep 2014 21:56:18 +0000 (14:56 -0700)
threadskv1.c
threadskv3.c

index 3ce5b8a6d9c58fd020d4ba6ecb756ad4cefbefa4..2d95c4a6a3a1c543cbacc716950ab561e1516228 100644 (file)
@@ -1,7 +1,7 @@
 // btree version threadskv1 sched_yield version
 //     with reworked bt_deletekey code
 //     and phase-fair reader writer lock
-// 12 MAR 2014
+// 08 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -1645,7 +1645,7 @@ uint idx;
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
 {
 unsigned char lowerfence[256], higherfence[256];
-uint slot, idx, dirty = 0, fence, found;
+uint slot, idx, found, fence;
 BtPageSet set[1], right[1];
 unsigned char value[BtId];
 BtKey ptr;
@@ -1655,7 +1655,7 @@ BtKey ptr;
        else
                return bt->err;
 
-       //      are we deleting a fence slot?
+       //      are we deleting a fence key?
 
        fence = slot == set->page->cnt;
 
@@ -1663,11 +1663,11 @@ BtKey ptr;
 
        if( found = !keycmp (ptr, key, len) )
          if( found = slotptr(set->page, slot)->dead == 0 ) {
-               dirty = slotptr(set->page, slot)->dead = 1;
+               slotptr(set->page, slot)->dead = 1;
                set->page->dirty = 1;
                set->page->act--;
 
-               // collapse empty slots
+               // collapse empty slots beneath our fence
 
                while( idx = set->page->cnt - 1 )
                  if( slotptr(set->page, idx)->dead ) {
@@ -1679,7 +1679,7 @@ BtKey ptr;
 
        //      did we delete a fence key in an upper level?
 
-       if( dirty && lvl && set->page->act && fence )
+       if( found && fence && lvl && set->page->act )
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
@@ -2057,81 +2057,48 @@ BtVal val;
        bt_unpinlatch (right->latch);
        return 0;
 }
-//  Insert new key into the btree at given level.
+//     install new key and value onto page
+//     page must already be checked for
+//     adequate space
 
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
+BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
 {
-BtPageSet set[1];
-uint slot, idx;
-uint reuse;
-BtKey ptr;
-BtVal val;
-
-       while( 1 ) {
-               if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
-                       ptr = keyptr(set->page, slot);
-               else
-               {
-                       if( !bt->err )
-                               bt->err = BTERR_ovflw;
-                       return bt->err;
-               }
-
-               // if key already exists, update value and return
-
-               if( reuse = !keycmp (ptr, key, keylen) )
-                 if( val = valptr(set->page, slot), val->len >= vallen ) {
-                       if( slotptr(set->page, slot)->dead )
-                               set->page->act++;
-                       slotptr(set->page, slot)->dead = 0;
-                       val->len = vallen;
-                       memcpy (val->value, value, vallen);
-                       bt_unlockpage(BtLockWrite, set->latch);
-                       bt_unpinlatch (set->latch);
-                       bt_unpinpool (set->pool);
-                       return 0;
-                 } else {
-                       if( !slotptr(set->page, slot)->dead )
-                               set->page->act--;
-                       slotptr(set->page, slot)->dead = 1;
-                       set->page->dirty = 1;
-                 }
-
-               // check if page has enough space
-
-               if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
-                       break;
-
-               if( bt_splitpage (bt, set) )
-                       return bt->err;
-       }
+BtSlot *node;
+uint idx;
 
-       // calculate next available slot and copy key into page
+       // copy value onto page
 
        set->page->min -= vallen + 1; // reset lowest used offset
        ((unsigned char *)set->page)[set->page->min] = vallen;
        memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
 
+       // copy key onto page
+
        set->page->min -= keylen + 1; // reset lowest used offset
        ((unsigned char *)set->page)[set->page->min] = keylen;
        memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
 
+       //      find first empty slot
+
        for( idx = slot; idx < set->page->cnt; idx++ )
          if( slotptr(set->page, idx)->dead )
                break;
 
        // now insert key into array before slot
 
-       if( !reuse && idx == set->page->cnt )
-               idx++, set->page->cnt++;
+       if( idx == set->page->cnt )
+               idx += 1, set->page->cnt += 1;
 
        set->page->act++;
 
        while( idx > slot )
-               *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+               *slotptr(set->page, idx) = *slotptr(set->page, idx - 1), idx--;
 
-       slotptr(set->page, slot)->off = set->page->min;
-       slotptr(set->page, slot)->dead = 0;
+       //      fill in new slot
+
+       node = slotptr(set->page, slot);
+       node->off = set->page->min;
+       node->dead = 0;
 
        bt_unlockpage (BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
@@ -2139,7 +2106,85 @@ BtVal val;
        return 0;
 }
 
-//  cache page of keys into cursor and return starting slot for given key
+//  Insert new key into the btree at given level.
+//     either add a new key or update/add an existing one
+
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
+{
+BtPageSet set[1];
+uint slot, idx;
+uid sequence;
+BtKey ptr;
+BtVal val;
+
+  while( 1 ) { // find the page and slot for the current key
+       if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
+               ptr = keyptr(set->page, slot);
+       else {
+               if( !bt->err )
+                       bt->err = BTERR_ovflw;
+               return bt->err;
+       }
+
+       //      check for adequate space on the page
+       //      and insert the new key before slot.
+
+       if( keycmp (ptr, key, keylen) ) {
+         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+               if( bt_splitpage (bt, set) )
+                 return bt->err;
+               else
+                 continue;
+
+         return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
+       }
+
+       // if key already exists, update value and return
+
+       if( val = valptr(set->page, slot), val->len >= vallen ) {
+               if( slotptr(set->page, slot)->dead )
+                       set->page->act++;
+               slotptr(set->page, slot)->dead = 0;
+               set->page->dirty = 1;
+               val->len = vallen;
+               memcpy (val->value, value, vallen);
+               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               return 0;
+       }
+
+       //      new update value doesn't fit in existing value area
+
+       if( !slotptr(set->page, slot)->dead )
+               set->page->dirty = 1;
+       else {
+               slotptr(set->page, slot)->dead = 0;
+               set->page->act++;
+       }
+
+       if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+         if( bt_splitpage (bt, set) )
+               return bt->err;
+         else
+               continue;
+
+       set->page->min -= vallen + 1;
+       ((unsigned char *)set->page)[set->page->min] = vallen;
+       memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
+
+       set->page->min -= keylen + 1;
+       ((unsigned char *)set->page)[set->page->min] = keylen;
+       memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
+       
+       slotptr(set->page, slot)->off = set->page->min;
+       bt_unlockpage(BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
+  }
+  return 0;
+}
 
 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
 {
@@ -2623,7 +2668,8 @@ BtDb *bt;
 #endif
        args = malloc (cnt * sizeof(ThreadArg));
 
-       mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
+//     mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
+       mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, 8191);
 
        if( !mgr ) {
                fprintf(stderr, "Index Open Error %s\n", argv[1]);
index 25501fde77ea14df59ce0904be2b58a74fcff5c7..3583b87a937f5e0052341f34eb3aa87237a99076 100644 (file)
@@ -2,7 +2,7 @@
 //     with reworked bt_deletekey code
 //     and phase-fair reader writer lock
 //     and librarian page split code
-// 02 SEP 2014
+// 08 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -147,10 +147,6 @@ typedef struct {
 
 //     Page key slot definition.
 
-//     If BT_maxbits is 15 or less, you can save 2 bytes
-//     for each key stored by making the two uints
-//     into ushorts.
-
 //     Keys are marked dead, but remain on the page until
 //     it cleanup is called. The fence key (highest key) for
 //     the page is always present, even after cleanup.
@@ -1571,14 +1567,11 @@ BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
 unsigned char leftkey[256], rightkey[256];
 unsigned char value[BtId];
 BtKey ptr;
-BtVal val;
 
        //      remove the old fence value
 
        ptr = keyptr(set->page, set->page->cnt);
-       val = valptr(set->page, set->page->cnt);
        memcpy (rightkey, ptr, ptr->len + 1);
-       set->page->garbage += ptr->len + val->len + 2;
        memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
 
        ptr = keyptr(set->page, set->page->cnt);
@@ -1648,7 +1641,7 @@ uint idx;
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
 {
 unsigned char lowerfence[256], higherfence[256];
-uint slot, idx, dirty = 0, fence, found;
+uint slot, idx, found, fence;
 BtPageSet set[1], right[1];
 unsigned char value[BtId];
 BtKey ptr, tst;
@@ -1664,8 +1657,6 @@ BtVal val;
        if( slotptr(set->page, slot)->librarian )
                ptr = keyptr(set->page, ++slot);
 
-       //      are we deleting a fence slot?
-
        fence = slot == set->page->cnt;
 
        // if key is found delete it, otherwise ignore request
@@ -1673,11 +1664,11 @@ BtVal val;
        if( found = !keycmp (ptr, key, len) )
          if( found = slotptr(set->page, slot)->dead == 0 ) {
                val = valptr(set->page,slot);
-               dirty = slotptr(set->page, slot)->dead = 1;
+               slotptr(set->page, slot)->dead = 1;
                set->page->garbage += ptr->len + val->len + 2;
                set->page->act--;
 
-               // collapse empty slots beneath our slot
+               // collapse empty slots beneath the fence
 
                while( idx = set->page->cnt - 1 )
                  if( slotptr(set->page, idx)->dead ) {
@@ -1689,7 +1680,7 @@ BtVal val;
 
        //      did we delete a fence key in an upper level?
 
-       if( dirty && lvl && set->page->act && fence )
+       if( found && fence && lvl && set->page->act )
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
@@ -1834,13 +1825,13 @@ uint newslot = max;
 BtKey key;
 BtVal val;
 
-       if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
+       if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
                return slot;
 
        //      skip cleanup and proceed to split
        //      if there's not enough garbage
 
-       if( page->garbage + page->min < 2 * page->act * sizeof(BtSlot) + sizeof(*page) + nxt / 3 )
+       if( page->garbage < nxt / 5 )
                return 0;
 
        memcpy (bt->frame, page, bt->mgr->page_size);
@@ -1875,9 +1866,11 @@ BtVal val;
 
                // make a librarian slot
 
-               slotptr(page, ++idx)->off = nxt;
-               slotptr(page, idx)->librarian = 1;
-               slotptr(page, idx)->dead = 1;
+               if( idx ) {
+                       slotptr(page, ++idx)->off = nxt;
+                       slotptr(page, idx)->librarian = 1;
+                       slotptr(page, idx)->dead = 1;
+               }
 
                // set up the slot
 
@@ -1892,7 +1885,7 @@ BtVal val;
 
        //      see if page has enough space now, or does it need splitting?
 
-       if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
+       if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
                return newslot;
 
        return 0;
@@ -1990,9 +1983,11 @@ BtVal val;
 
                //      add librarian slot
 
-               slotptr(bt->frame, ++idx)->off = nxt;
-               slotptr(bt->frame, idx)->librarian = 1;
-               slotptr(bt->frame, idx)->dead = 1;
+               if( idx ) {
+                       slotptr(bt->frame, ++idx)->off = nxt;
+                       slotptr(bt->frame, idx)->librarian = 1;
+                       slotptr(bt->frame, idx)->dead = 1;
+               }
 
                //  add actual slot
 
@@ -2051,9 +2046,11 @@ BtVal val;
 
                //      add librarian slot
 
-               slotptr(set->page, ++idx)->off = nxt;
-               slotptr(set->page, idx)->librarian = 1;
-               slotptr(set->page, idx)->dead = 1;
+               if( idx ) {
+                       slotptr(set->page, ++idx)->off = nxt;
+                       slotptr(set->page, idx)->librarian = 1;
+                       slotptr(set->page, idx)->dead = 1;
+               }
 
                //      add actual slot
 
@@ -2104,97 +2101,68 @@ BtVal val;
        bt_unpinlatch (right->latch);
        return 0;
 }
-//  Insert new key into the btree at given level.
-
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
-{
-BtPageSet set[1];
-uint slot, idx;
-uint reuse;
-BtKey ptr;
-BtKey tst;
-BtVal val;
-
-       while( 1 ) {
-               if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
-                       ptr = keyptr(set->page, slot);
-               else {
-                       if( !bt->err )
-                               bt->err = BTERR_ovflw;
-                       return bt->err;
-               }
-
-               // if librarian slot == found slot, advance to real slot
 
-               if( slotptr(set->page, slot)->librarian )
-                 if( !keycmp (ptr, key, keylen) )
-                       ptr = keyptr(set->page, ++slot);
+//     install new key and value onto page
+//     page must already be checked for
+//     adequate space
 
-               // if key already exists, update value and return
-
-               if( reuse = !keycmp (ptr, key, keylen) )
-                 if( val = valptr(set->page, slot), val->len >= vallen ) {
-                       if( slotptr(set->page, slot)->dead )
-                               set->page->act++;
-                       set->page->garbage += val->len - vallen;
-                       slotptr(set->page, slot)->dead = 0;
-                       val->len = vallen;
-                       memcpy (val->value, value, vallen);
-                       bt_unlockpage(BtLockWrite, set->latch);
-                       bt_unpinlatch (set->latch);
-                       bt_unpinpool (set->pool);
-                       return 0;
-                 } else {
-                       if( !slotptr(set->page, slot)->dead ) {
-                               set->page->garbage += val->len + ptr->len + 2;
-                               set->page->act--;
-                       }
-                       slotptr(set->page, slot)->dead = 1;
-                 }
-
-               //      if found slot > desired slot and previous slot
-               //      is a librarian slot, use it
-
-               if( !reuse && slot > 1 )
-                 if( slotptr(set->page, slot-1)->librarian )
-                       slot--;
-
-               // check if page has enough space
+BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
+{
+uint idx, librarian;
+BtSlot *node;
 
-               if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
-                       break;
+       //      if found slot > desired slot and previous slot
+       //      is a librarian slot, use it
 
-               if( bt_splitpage (bt, set) )
-                       return bt->err;
-       }
+       if( slot > 1 )
+         if( slotptr(set->page, slot-1)->librarian )
+               slot--;
 
-       // calculate next available slot and copy key into page
+       // copy value onto page
 
        set->page->min -= vallen + 1; // reset lowest used offset
        ((unsigned char *)set->page)[set->page->min] = vallen;
        memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
 
+       // copy key onto page
+
        set->page->min -= keylen + 1; // reset lowest used offset
        ((unsigned char *)set->page)[set->page->min] = keylen;
        memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
 
+       //      find first empty slot
+
        for( idx = slot; idx < set->page->cnt; idx++ )
          if( slotptr(set->page, idx)->dead )
                break;
 
        // now insert key into array before slot
 
-       if( !reuse && idx == set->page->cnt )
-               idx++, set->page->cnt++;
+       if( idx == set->page->cnt )
+               idx += 2, set->page->cnt += 2, librarian = 2;
+       else
+               librarian = 1;
 
        set->page->act++;
 
-       while( idx > slot )
-               *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+       while( idx > slot + librarian - 1 )
+               *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
 
-       slotptr(set->page, slot)->off = set->page->min;
-       slotptr(set->page, slot)->librarian = 0;
-       slotptr(set->page, slot)->dead = 0;
+       //      add librarian slot
+
+       if( librarian > 1 ) {
+               node = slotptr(set->page, slot++);
+               node->off = set->page->min;
+               node->librarian = 1;
+               node->dead = 1;
+       }
+
+       //      fill in new slot
+
+       node = slotptr(set->page, slot);
+       node->off = set->page->min;
+       node->librarian = 0;
+       node->dead = 0;
 
        bt_unlockpage (BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
@@ -2202,6 +2170,93 @@ BtVal val;
        return 0;
 }
 
+//  Insert new key into the btree at given level.
+//     either add a new key or update/add an existing one
+
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
+{
+BtPageSet set[1];
+uint slot, idx;
+uid sequence;
+BtKey ptr;
+BtVal val;
+uint type;
+
+  while( 1 ) { // find the page and slot for the current key
+       if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
+               ptr = keyptr(set->page, slot);
+       else {
+               if( !bt->err )
+                       bt->err = BTERR_ovflw;
+               return bt->err;
+       }
+
+       // if librarian slot == found slot, advance to real slot
+
+       if( slotptr(set->page, slot)->librarian )
+         if( !keycmp (ptr, key, keylen) )
+               ptr = keyptr(set->page, ++slot);
+
+       //      check for adequate space on the page
+       //      and insert the new key before slot.
+
+       if( keycmp (ptr, key, keylen) ) {
+         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+               if( bt_splitpage (bt, set) )
+                 return bt->err;
+               else
+                 continue;
+
+         return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
+       }
+
+       // if key already exists, update value and return
+
+       if( val = valptr(set->page, slot), val->len >= vallen ) {
+               if( slotptr(set->page, slot)->dead )
+                       set->page->act++;
+               set->page->garbage += val->len - vallen;
+               slotptr(set->page, slot)->dead = 0;
+               val->len = vallen;
+               memcpy (val->value, value, vallen);
+               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               return 0;
+       }
+
+       //      new update value doesn't fit in existing value area
+
+       if( !slotptr(set->page, slot)->dead )
+               set->page->garbage += val->len + ptr->len + 2;
+       else {
+               slotptr(set->page, slot)->dead = 0;
+               set->page->act++;
+       }
+
+       if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+         if( bt_splitpage (bt, set) )
+               return bt->err;
+         else
+               continue;
+
+       set->page->min -= vallen + 1;
+       ((unsigned char *)set->page)[set->page->min] = vallen;
+       memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
+
+       set->page->min -= keylen + 1;
+       ((unsigned char *)set->page)[set->page->min] = keylen;
+       memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
+       
+       slotptr(set->page, slot)->off = set->page->min;
+       bt_unlockpage(BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
+  }
+  return 0;
+}
+
 //  cache page of keys into cursor and return starting slot for given key
 
 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
@@ -2518,11 +2573,12 @@ FILE *in;
 
                          if( bt_deletekey (bt, key, len, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+                         found += bt->found;
                          len = 0;
                        }
                        else if( len < 255 )
                                key[len++] = ch;
-               fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
+               fprintf(stderr, "finished %s for %d keys, found %d \n", args->infile, line, found);
                break;
 
        case 'f':
@@ -2559,7 +2615,6 @@ FILE *in;
                        set->latch = bt_pinlatch (bt, page_no);
                        bt_lockpage (BtLockRead, set->latch);
                        next = bt_getid (set->page->right);
-                       cnt += set->page->act;
 
                        for( slot = 0; slot++ < set->page->cnt; )
                         if( next || slot < set->page->cnt )
@@ -2567,6 +2622,7 @@ FILE *in;
                                ptr = keyptr(set->page, slot);
                                fwrite (ptr->key, ptr->len, 1, stdout);
                                fputc ('\n', stdout);
+                               cnt++;
                          }
 
                        bt_unlockpage (BtLockRead, set->latch);
@@ -2574,7 +2630,6 @@ FILE *in;
                        bt_unpinpool (set->pool);
                } while( page_no = next );
 
-               cnt--;  // remove stopper key
                fprintf(stderr, " Total keys read %d\n", cnt);
                break;