X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv1.c;h=2d95c4a6a3a1c543cbacc716950ab561e1516228;hp=57111acb3f7544143bf0bb9d0daf2d028043a3f6;hb=HEAD;hpb=433c48a8b08c7140d6b103fba147635054d31281 diff --git a/threadskv1.c b/threadskv1.c index 57111ac..2d95c4a 100644 --- a/threadskv1.c +++ b/threadskv1.c @@ -1,7 +1,7 @@ // btree version threadskv1 sched_yield version // with reworked bt_deletekey code // and phase-fair reader writer lock -// 12 MAR 2014 +// 08 SEP 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -281,7 +281,7 @@ typedef enum { // B-Tree functions extern void bt_close (BtDb *bt); extern BtDb *bt_open (BtMgr *mgr); -extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, unsigned char *value, uint vallen); +extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen); extern BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl); extern int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen); extern uint bt_startkey (BtDb *bt, unsigned char *key, uint len); @@ -292,15 +292,16 @@ extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segs void bt_mgrclose (BtMgr *mgr); // Helper functions to return slot values +// from the cursor page. extern BtKey bt_key (BtDb *bt, uint slot); extern BtVal bt_val (BtDb *bt, uint slot); // BTree page number constants -#define ALLOC_page 0 // allocation & lock manager hash table +#define ALLOC_page 0 // allocation & latch manager hash table #define ROOT_page 1 // root of the btree #define LEAF_page 2 // first page of leaves -#define LATCH_page 3 // pages for lock manager +#define LATCH_page 3 // pages for latch manager // Number of levels to create in a new BTree @@ -309,7 +310,7 @@ extern BtVal bt_val (BtDb *bt, uint slot); // The page is allocated from low and hi ends. // The key slots are allocated from the bottom, // while the text and value of the key -// is allocated from the top. When the two +// are allocated from the top. When the two // areas meet, the page is split into two. // A key consists of a length byte, two bytes of @@ -741,7 +742,7 @@ uint slot; pool = mgr->pool + slot; if( pool->slot ) #ifdef unix - munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits); + munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits); #else { FlushViewOfFile(pool->map, 0); @@ -753,7 +754,7 @@ uint slot; #ifdef unix munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size); - munmap (mgr->latchmgr, mgr->page_size); + munmap (mgr->latchmgr, 2 * mgr->page_size); #else FlushViewOfFile(mgr->latchmgr, 0); UnmapViewOfFile(mgr->latchmgr); @@ -992,13 +993,18 @@ SYSTEM_INFO sysinfo[1]; mgrlatch: #ifdef unix + // mlock the root page and the latchmgr page + flag = PROT_READ | PROT_WRITE; - mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size); + mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size); if( mgr->latchmgr == MAP_FAILED ) return bt_mgrclose (mgr), NULL; + mlock (mgr->latchmgr, 2 * mgr->page_size); + mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size); if( mgr->latchsets == MAP_FAILED ) return bt_mgrclose (mgr), NULL; + mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size); #else flag = PAGE_READWRITE; mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL); @@ -1119,7 +1125,7 @@ int flag; #ifdef unix flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE ); - pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off); + pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off); if( pool->map == MAP_FAILED ) return bt->err = BTERR_map; @@ -1130,7 +1136,7 @@ int flag; return bt->err = BTERR_map; flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE ); - pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits); + pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits); if( !pool->map ) return bt->err = BTERR_map; #endif @@ -1145,6 +1151,7 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping BtPage page; page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits)); +// madvise (page, bt->mgr->page_size, MADV_WILLNEED); return page; } @@ -1265,7 +1272,7 @@ BtPool *pool, *node, *next; // remove old file mapping #ifdef unix - munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits); + munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits); #else // FlushViewOfFile(pool->map, 0); UnmapViewOfFile(pool->map); @@ -1497,8 +1504,10 @@ BtPool *prevpool; // find key on page at this level // and descend to requested level - if( !set->page->kill ) - if( slot = bt_findslot (set, key, len) ) { + if( set->page->kill ) + goto slideright; + + if( slot = bt_findslot (set, key, len) ) { if( drill == lvl ) return slot; @@ -1511,7 +1520,7 @@ BtPool *prevpool; page_no = bt_getid(valptr(set->page, slot)->value); drill--; continue; - } + } // or slide right into next page @@ -1559,7 +1568,6 @@ BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl) { unsigned char leftkey[256], rightkey[256]; unsigned char value[BtId]; -uid page_no; BtKey ptr; // remove the old fence value @@ -1572,14 +1580,13 @@ BtKey ptr; ptr = keyptr(set->page, set->page->cnt); memcpy (leftkey, ptr, ptr->len + 1); - page_no = set->page_no; bt_lockpage (BtLockParent, set->latch); bt_unlockpage (BtLockWrite, set->latch); // insert new (now smaller) fence key - bt_putid (value, page_no); + bt_putid (value, set->page_no); if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) ) return bt->err; @@ -1638,7 +1645,7 @@ uint idx; BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl) { unsigned char lowerfence[256], higherfence[256]; -uint slot, idx, dirty = 0, fence, found; +uint slot, idx, found, fence; BtPageSet set[1], right[1]; unsigned char value[BtId]; BtKey ptr; @@ -1648,7 +1655,7 @@ BtKey ptr; else return bt->err; - // are we deleting a fence slot? + // are we deleting a fence key? fence = slot == set->page->cnt; @@ -1656,11 +1663,11 @@ BtKey ptr; if( found = !keycmp (ptr, key, len) ) if( found = slotptr(set->page, slot)->dead == 0 ) { - dirty = slotptr(set->page, slot)->dead = 1; + slotptr(set->page, slot)->dead = 1; set->page->dirty = 1; set->page->act--; - // collapse empty slots + // collapse empty slots beneath our fence while( idx = set->page->cnt - 1 ) if( slotptr(set->page, idx)->dead ) { @@ -1672,7 +1679,7 @@ BtKey ptr; // did we delete a fence key in an upper level? - if( dirty && lvl && set->page->act && fence ) + if( found && fence && lvl && set->page->act ) if( bt_fixfence (bt, set, lvl) ) return bt->err; else @@ -1780,10 +1787,10 @@ int ret; else return 0; - // if key exists, return TRUE - // otherwise return FALSE + // if key exists, return >= 0 value bytes copied + // otherwise return (-1) - if( !keycmp (ptr, key, keylen) ) { + if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) { val = valptr (set->page,slot); if( valmax > val->len ) valmax = val->len; @@ -1837,22 +1844,22 @@ BtVal val; if( cnt < max && slotptr(bt->frame,cnt)->dead ) continue; - // copy the key across - - key = keyptr(bt->frame, cnt); - nxt -= key->len + 1; - memcpy ((unsigned char *)page + nxt, key, key->len + 1); - // copy the value across val = valptr(bt->frame, cnt); nxt -= val->len + 1; ((unsigned char *)page)[nxt] = val->len; - memcpy ((unsigned char *)page + nxt + 1, val, val->len); + memcpy ((unsigned char *)page + nxt + 1, val->value, val->len); + + // copy the key across + + key = keyptr(bt->frame, cnt); + nxt -= key->len + 1; + memcpy ((unsigned char *)page + nxt, key, key->len + 1); // set up the slot - slotptr(page, idx)->off = nxt; + slotptr(page, ++idx)->off = nxt; if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) ) page->act++; @@ -2050,81 +2057,48 @@ BtVal val; bt_unpinlatch (right->latch); return 0; } -// Insert new key into the btree at given level. +// install new key and value onto page +// page must already be checked for +// adequate space -BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, unsigned char *value, uint vallen) +BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen) { -BtPageSet set[1]; -uint slot, idx; -uint reuse; -BtKey ptr; -BtVal val; - - while( 1 ) { - if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) - ptr = keyptr(set->page, slot); - else - { - if( !bt->err ) - bt->err = BTERR_ovflw; - return bt->err; - } - - // if key already exists, update id and return - - if( reuse = !keycmp (ptr, key, keylen) ) - if( val = valptr(set->page, slot), val->len >= vallen ) { - if( slotptr(set->page, slot)->dead ) - set->page->act++; - slotptr(set->page, slot)->dead = 0; - val->len = vallen; - memcpy (val->value, value, vallen); - bt_unlockpage(BtLockWrite, set->latch); - bt_unpinlatch (set->latch); - bt_unpinpool (set->pool); - return 0; - } else { - if( !slotptr(set->page, slot)->dead ) - set->page->act--; - slotptr(set->page, slot)->dead = 1; - set->page->dirty = 1; - } - - // check if page has enough space - - if( slot = bt_cleanpage (bt, set->page, keylen, slot, vallen) ) - break; - - if( bt_splitpage (bt, set) ) - return bt->err; - } +BtSlot *node; +uint idx; - // calculate next available slot and copy key into page + // copy value onto page set->page->min -= vallen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = vallen; memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen ); + // copy key onto page + set->page->min -= keylen + 1; // reset lowest used offset ((unsigned char *)set->page)[set->page->min] = keylen; memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen ); + // find first empty slot + for( idx = slot; idx < set->page->cnt; idx++ ) if( slotptr(set->page, idx)->dead ) break; // now insert key into array before slot - if( !reuse && idx == set->page->cnt ) - idx++, set->page->cnt++; + if( idx == set->page->cnt ) + idx += 1, set->page->cnt += 1; set->page->act++; while( idx > slot ) - *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--; + *slotptr(set->page, idx) = *slotptr(set->page, idx - 1), idx--; - slotptr(set->page, slot)->off = set->page->min; - slotptr(set->page, slot)->dead = 0; + // fill in new slot + + node = slotptr(set->page, slot); + node->off = set->page->min; + node->dead = 0; bt_unlockpage (BtLockWrite, set->latch); bt_unpinlatch (set->latch); @@ -2132,7 +2106,85 @@ BtVal val; return 0; } -// cache page of keys into cursor and return starting slot for given key +// Insert new key into the btree at given level. +// either add a new key or update/add an existing one + +BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen) +{ +BtPageSet set[1]; +uint slot, idx; +uid sequence; +BtKey ptr; +BtVal val; + + while( 1 ) { // find the page and slot for the current key + if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) ) + ptr = keyptr(set->page, slot); + else { + if( !bt->err ) + bt->err = BTERR_ovflw; + return bt->err; + } + + // check for adequate space on the page + // and insert the new key before slot. + + if( keycmp (ptr, key, keylen) ) { + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + return bt_insertslot (bt, set, slot, key, keylen, value, vallen); + } + + // if key already exists, update value and return + + if( val = valptr(set->page, slot), val->len >= vallen ) { + if( slotptr(set->page, slot)->dead ) + set->page->act++; + slotptr(set->page, slot)->dead = 0; + set->page->dirty = 1; + val->len = vallen; + memcpy (val->value, value, vallen); + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + + // new update value doesn't fit in existing value area + + if( !slotptr(set->page, slot)->dead ) + set->page->dirty = 1; + else { + slotptr(set->page, slot)->dead = 0; + set->page->act++; + } + + if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) ) + if( bt_splitpage (bt, set) ) + return bt->err; + else + continue; + + set->page->min -= vallen + 1; + ((unsigned char *)set->page)[set->page->min] = vallen; + memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen); + + set->page->min -= keylen + 1; + ((unsigned char *)set->page)[set->page->min] = keylen; + memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen); + + slotptr(set->page, slot)->off = set->page->min; + bt_unlockpage(BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + bt_unpinpool (set->pool); + return 0; + } + return 0; +} uint bt_startkey (BtDb *bt, unsigned char *key, uint len) { @@ -2298,7 +2350,7 @@ BtKey ptr; if( *latch->parent->rin & MASK ) fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no); - memset ((ushort *)latch->access, 0, sizeof(RWLock)); + memset ((ushort *)latch->parent, 0, sizeof(RWLock)); if( latch->pin ) { fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no); @@ -2357,7 +2409,8 @@ BtKey ptr; } typedef struct { - char type, idx; + char idx; + char *type; char *infile; BtMgr *mgr; int num; @@ -2384,7 +2437,7 @@ FILE *in; bt = bt_open (args->mgr); - switch(args->type | 0x20) + switch(args->type[0] | 0x20) { case 'a': fprintf(stderr, "started latch mgr audit\n"); @@ -2445,6 +2498,8 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; + if( (args->type[1] | 0x20) == 'p' ) + len = 10; if( bt_deletekey (bt, key, len, 0) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); len = 0; @@ -2467,6 +2522,8 @@ FILE *in; else if( args->num ) sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9; + if( (args->type[1] | 0x20) == 'p' ) + len = 10; if( bt_findkey (bt, key, len, NULL, 0) == 0 ) found++; else if( bt->err ) @@ -2611,7 +2668,8 @@ BtDb *bt; #endif args = malloc (cnt * sizeof(ThreadArg)); - mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8); +// mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8); + mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, 8191); if( !mgr ) { fprintf(stderr, "Index Open Error %s\n", argv[1]); @@ -2622,7 +2680,7 @@ BtDb *bt; for( idx = 0; idx < cnt; idx++ ) { args[idx].infile = argv[idx + 7]; - args[idx].type = argv[2][0]; + args[idx].type = argv[2]; args[idx].mgr = mgr; args[idx].num = num; args[idx].idx = idx;