]> pd.if.org Git - btree/blobdiff - threadskv1.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv1.c
index 57111acb3f7544143bf0bb9d0daf2d028043a3f6..2d95c4a6a3a1c543cbacc716950ab561e1516228 100644 (file)
@@ -1,7 +1,7 @@
 // btree version threadskv1 sched_yield version
 //     with reworked bt_deletekey code
 //     and phase-fair reader writer lock
-// 12 MAR 2014
+// 08 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -281,7 +281,7 @@ typedef enum {
 // B-Tree functions
 extern void bt_close (BtDb *bt);
 extern BtDb *bt_open (BtMgr *mgr);
-extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, unsigned char *value, uint vallen);
+extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
@@ -292,15 +292,16 @@ extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segs
 void bt_mgrclose (BtMgr *mgr);
 
 //  Helper functions to return slot values
+//     from the cursor page.
 
 extern BtKey bt_key (BtDb *bt, uint slot);
 extern BtVal bt_val (BtDb *bt, uint slot);
 
 //  BTree page number constants
-#define ALLOC_page             0       // allocation & lock manager hash table
+#define ALLOC_page             0       // allocation & latch manager hash table
 #define ROOT_page              1       // root of the btree
 #define LEAF_page              2       // first page of leaves
-#define LATCH_page             3       // pages for lock manager
+#define LATCH_page             3       // pages for latch manager
 
 //     Number of levels to create in a new BTree
 
@@ -309,7 +310,7 @@ extern BtVal bt_val (BtDb *bt, uint slot);
 //  The page is allocated from low and hi ends.
 //  The key slots are allocated from the bottom,
 //     while the text and value of the key
-//  is allocated from the top.  When the two
+//  are allocated from the top.  When the two
 //  areas meet, the page is split into two.
 
 //  A key consists of a length byte, two bytes of
@@ -741,7 +742,7 @@ uint slot;
                pool = mgr->pool + slot;
                if( pool->slot )
 #ifdef unix
-                       munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
+                       munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
 #else
                {
                        FlushViewOfFile(pool->map, 0);
@@ -753,7 +754,7 @@ uint slot;
 
 #ifdef unix
        munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
-       munmap (mgr->latchmgr, mgr->page_size);
+       munmap (mgr->latchmgr, 2 * mgr->page_size);
 #else
        FlushViewOfFile(mgr->latchmgr, 0);
        UnmapViewOfFile(mgr->latchmgr);
@@ -992,13 +993,18 @@ SYSTEM_INFO sysinfo[1];
 
 mgrlatch:
 #ifdef unix
+       // mlock the root page and the latchmgr page
+
        flag = PROT_READ | PROT_WRITE;
-       mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
+       mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
        if( mgr->latchmgr == MAP_FAILED )
                return bt_mgrclose (mgr), NULL;
+       mlock (mgr->latchmgr, 2 * mgr->page_size);
+
        mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
        if( mgr->latchsets == MAP_FAILED )
                return bt_mgrclose (mgr), NULL;
+       mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
 #else
        flag = PAGE_READWRITE;
        mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
@@ -1119,7 +1125,7 @@ int flag;
 
 #ifdef unix
        flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
-       pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
+       pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
 
@@ -1130,7 +1136,7 @@ int flag;
                return bt->err = BTERR_map;
 
        flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
-       pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
+       pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
        if( !pool->map )
                return bt->err = BTERR_map;
 #endif
@@ -1145,6 +1151,7 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
+//     madvise (page, bt->mgr->page_size, MADV_WILLNEED);
        return page;
 }
 
@@ -1265,7 +1272,7 @@ BtPool *pool, *node, *next;
 
                //      remove old file mapping
 #ifdef unix
-               munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
+               munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
 #else
 //             FlushViewOfFile(pool->map, 0);
                UnmapViewOfFile(pool->map);
@@ -1497,8 +1504,10 @@ BtPool *prevpool;
        //  find key on page at this level
        //  and descend to requested level
 
-       if( !set->page->kill )
-        if( slot = bt_findslot (set, key, len) ) {
+       if( set->page->kill )
+         goto slideright;
+
+       if( slot = bt_findslot (set, key, len) ) {
          if( drill == lvl )
                return slot;
 
@@ -1511,7 +1520,7 @@ BtPool *prevpool;
          page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
          continue;
-        }
+       }
 
        //  or slide right into next page
 
@@ -1559,7 +1568,6 @@ BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
 {
 unsigned char leftkey[256], rightkey[256];
 unsigned char value[BtId];
-uid page_no;
 BtKey ptr;
 
        //      remove the old fence value
@@ -1572,14 +1580,13 @@ BtKey ptr;
 
        ptr = keyptr(set->page, set->page->cnt);
        memcpy (leftkey, ptr, ptr->len + 1);
-       page_no = set->page_no;
 
        bt_lockpage (BtLockParent, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
 
        //      insert new (now smaller) fence key
 
-       bt_putid (value, page_no);
+       bt_putid (value, set->page_no);
 
        if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
          return bt->err;
@@ -1638,7 +1645,7 @@ uint idx;
 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
 {
 unsigned char lowerfence[256], higherfence[256];
-uint slot, idx, dirty = 0, fence, found;
+uint slot, idx, found, fence;
 BtPageSet set[1], right[1];
 unsigned char value[BtId];
 BtKey ptr;
@@ -1648,7 +1655,7 @@ BtKey ptr;
        else
                return bt->err;
 
-       //      are we deleting a fence slot?
+       //      are we deleting a fence key?
 
        fence = slot == set->page->cnt;
 
@@ -1656,11 +1663,11 @@ BtKey ptr;
 
        if( found = !keycmp (ptr, key, len) )
          if( found = slotptr(set->page, slot)->dead == 0 ) {
-               dirty = slotptr(set->page, slot)->dead = 1;
+               slotptr(set->page, slot)->dead = 1;
                set->page->dirty = 1;
                set->page->act--;
 
-               // collapse empty slots
+               // collapse empty slots beneath our fence
 
                while( idx = set->page->cnt - 1 )
                  if( slotptr(set->page, idx)->dead ) {
@@ -1672,7 +1679,7 @@ BtKey ptr;
 
        //      did we delete a fence key in an upper level?
 
-       if( dirty && lvl && set->page->act && fence )
+       if( found && fence && lvl && set->page->act )
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
@@ -1780,10 +1787,10 @@ int ret;
        else
                return 0;
 
-       // if key exists, return TRUE
-       //      otherwise return FALSE
+       // if key exists, return >= 0 value bytes copied
+       //      otherwise return (-1)
 
-       if( !keycmp (ptr, key, keylen) ) {
+       if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
                val = valptr (set->page,slot);
                if( valmax > val->len )
                        valmax = val->len;
@@ -1837,22 +1844,22 @@ BtVal val;
                if( cnt < max && slotptr(bt->frame,cnt)->dead )
                        continue;
 
-               // copy the key across
-
-               key = keyptr(bt->frame, cnt);
-               nxt -= key->len + 1;
-               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
-
                // copy the value across
 
                val = valptr(bt->frame, cnt);
                nxt -= val->len + 1;
                ((unsigned char *)page)[nxt] = val->len;
-               memcpy ((unsigned char *)page + nxt + 1, val, val->len);
+               memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
+
+               // copy the key across
+
+               key = keyptr(bt->frame, cnt);
+               nxt -= key->len + 1;
+               memcpy ((unsigned char *)page + nxt, key, key->len + 1);
 
                // set up the slot
 
-               slotptr(page, idx)->off = nxt;
+               slotptr(page, ++idx)->off = nxt;
 
                if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
                        page->act++;
@@ -2050,81 +2057,48 @@ BtVal val;
        bt_unpinlatch (right->latch);
        return 0;
 }
-//  Insert new key into the btree at given level.
+//     install new key and value onto page
+//     page must already be checked for
+//     adequate space
 
-BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, unsigned char *value, uint vallen)
+BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
 {
-BtPageSet set[1];
-uint slot, idx;
-uint reuse;
-BtKey ptr;
-BtVal val;
-
-       while( 1 ) {
-               if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
-                       ptr = keyptr(set->page, slot);
-               else
-               {
-                       if( !bt->err )
-                               bt->err = BTERR_ovflw;
-                       return bt->err;
-               }
-
-               // if key already exists, update id and return
-
-               if( reuse = !keycmp (ptr, key, keylen) )
-                 if( val = valptr(set->page, slot), val->len >= vallen ) {
-                       if( slotptr(set->page, slot)->dead )
-                               set->page->act++;
-                       slotptr(set->page, slot)->dead = 0;
-                       val->len = vallen;
-                       memcpy (val->value, value, vallen);
-                       bt_unlockpage(BtLockWrite, set->latch);
-                       bt_unpinlatch (set->latch);
-                       bt_unpinpool (set->pool);
-                       return 0;
-                 } else {
-                       if( !slotptr(set->page, slot)->dead )
-                               set->page->act--;
-                       slotptr(set->page, slot)->dead = 1;
-                       set->page->dirty = 1;
-                 }
-
-               // check if page has enough space
-
-               if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) )
-                       break;
-
-               if( bt_splitpage (bt, set) )
-                       return bt->err;
-       }
+BtSlot *node;
+uint idx;
 
-       // calculate next available slot and copy key into page
+       // copy value onto page
 
        set->page->min -= vallen + 1; // reset lowest used offset
        ((unsigned char *)set->page)[set->page->min] = vallen;
        memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
 
+       // copy key onto page
+
        set->page->min -= keylen + 1; // reset lowest used offset
        ((unsigned char *)set->page)[set->page->min] = keylen;
        memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
 
+       //      find first empty slot
+
        for( idx = slot; idx < set->page->cnt; idx++ )
          if( slotptr(set->page, idx)->dead )
                break;
 
        // now insert key into array before slot
 
-       if( !reuse && idx == set->page->cnt )
-               idx++, set->page->cnt++;
+       if( idx == set->page->cnt )
+               idx += 1, set->page->cnt += 1;
 
        set->page->act++;
 
        while( idx > slot )
-               *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
+               *slotptr(set->page, idx) = *slotptr(set->page, idx - 1), idx--;
 
-       slotptr(set->page, slot)->off = set->page->min;
-       slotptr(set->page, slot)->dead = 0;
+       //      fill in new slot
+
+       node = slotptr(set->page, slot);
+       node->off = set->page->min;
+       node->dead = 0;
 
        bt_unlockpage (BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
@@ -2132,7 +2106,85 @@ BtVal val;
        return 0;
 }
 
-//  cache page of keys into cursor and return starting slot for given key
+//  Insert new key into the btree at given level.
+//     either add a new key or update/add an existing one
+
+BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
+{
+BtPageSet set[1];
+uint slot, idx;
+uid sequence;
+BtKey ptr;
+BtVal val;
+
+  while( 1 ) { // find the page and slot for the current key
+       if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
+               ptr = keyptr(set->page, slot);
+       else {
+               if( !bt->err )
+                       bt->err = BTERR_ovflw;
+               return bt->err;
+       }
+
+       //      check for adequate space on the page
+       //      and insert the new key before slot.
+
+       if( keycmp (ptr, key, keylen) ) {
+         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+               if( bt_splitpage (bt, set) )
+                 return bt->err;
+               else
+                 continue;
+
+         return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
+       }
+
+       // if key already exists, update value and return
+
+       if( val = valptr(set->page, slot), val->len >= vallen ) {
+               if( slotptr(set->page, slot)->dead )
+                       set->page->act++;
+               slotptr(set->page, slot)->dead = 0;
+               set->page->dirty = 1;
+               val->len = vallen;
+               memcpy (val->value, value, vallen);
+               bt_unlockpage(BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+               bt_unpinpool (set->pool);
+               return 0;
+       }
+
+       //      new update value doesn't fit in existing value area
+
+       if( !slotptr(set->page, slot)->dead )
+               set->page->dirty = 1;
+       else {
+               slotptr(set->page, slot)->dead = 0;
+               set->page->act++;
+       }
+
+       if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
+         if( bt_splitpage (bt, set) )
+               return bt->err;
+         else
+               continue;
+
+       set->page->min -= vallen + 1;
+       ((unsigned char *)set->page)[set->page->min] = vallen;
+       memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
+
+       set->page->min -= keylen + 1;
+       ((unsigned char *)set->page)[set->page->min] = keylen;
+       memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
+       
+       slotptr(set->page, slot)->off = set->page->min;
+       bt_unlockpage(BtLockWrite, set->latch);
+       bt_unpinlatch (set->latch);
+       bt_unpinpool (set->pool);
+       return 0;
+  }
+  return 0;
+}
 
 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
 {
@@ -2298,7 +2350,7 @@ BtKey ptr;
 
                if( *latch->parent->rin & MASK )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
-               memset ((ushort *)latch->access, 0, sizeof(RWLock));
+               memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
                if( latch->pin ) {
                        fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
@@ -2357,7 +2409,8 @@ BtKey ptr;
 }
 
 typedef struct {
-       char type, idx;
+       char idx;
+       char *type;
        char *infile;
        BtMgr *mgr;
        int num;
@@ -2384,7 +2437,7 @@ FILE *in;
 
        bt = bt_open (args->mgr);
 
-       switch(args->type | 0x20)
+       switch(args->type[0] | 0x20)
        {
        case 'a':
                fprintf(stderr, "started latch mgr audit\n");
@@ -2445,6 +2498,8 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
+                         if( (args->type[1] | 0x20) == 'p' )
+                               len = 10;
                          if( bt_deletekey (bt, key, len, 0) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
@@ -2467,6 +2522,8 @@ FILE *in;
                          else if( args->num )
                                sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
 
+                         if( (args->type[1] | 0x20) == 'p' )
+                               len = 10;
                          if( bt_findkey (bt, key, len, NULL, 0) == 0 )
                                found++;
                          else if( bt->err )
@@ -2611,7 +2668,8 @@ BtDb *bt;
 #endif
        args = malloc (cnt * sizeof(ThreadArg));
 
-       mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
+//     mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
+       mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, 8191);
 
        if( !mgr ) {
                fprintf(stderr, "Index Open Error %s\n", argv[1]);
@@ -2622,7 +2680,7 @@ BtDb *bt;
 
        for( idx = 0; idx < cnt; idx++ ) {
                args[idx].infile = argv[idx + 7];
-               args[idx].type = argv[2][0];
+               args[idx].type = argv[2];
                args[idx].mgr = mgr;
                args[idx].num = num;
                args[idx].idx = idx;