X-Git-Url: https://pd.if.org/git/?p=btree;a=blobdiff_plain;f=threadskv8.c;h=f2a23c7d1ecbfddbf4957e65831b386a973c9946;hp=44793a09d99365c97979c5bd79d44905ff555283;hb=077c0aa4f70ea6c8acceb715de0a7c7d28b32c73;hpb=3ba73f07ec76f27a3b97fb59fd838e4b129f2432 diff --git a/threadskv8.c b/threadskv8.c index 44793a0..f2a23c7 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -155,11 +155,6 @@ typedef struct { uint prev; // prev entry in hash table chain volatile ushort pin; // number of outstanding threads ushort dirty:1; // page in cache is dirty -#ifdef unix - pthread_t atomictid; // pid holding atomic lock -#else - uint atomictid; // pid holding atomic lock -#endif } BtLatchSet; // Define the length of the page record numbers @@ -189,7 +184,8 @@ typedef enum { Unique, Librarian, Duplicate, - Delete + Delete, + Update } BtSlotType; typedef struct { @@ -652,7 +648,6 @@ BtLatchSet *latch = bt->mgr->latchsets + slot; bt->mgr->latchsets[latch->next].prev = slot; bt->mgr->hashtable[hashidx].slot = slot; - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); latch->page_no = page_no; latch->entry = slot; latch->split = 0; @@ -1139,10 +1134,6 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch) case BtLockAtomic: WriteLock (latch->atomic); break; - case BtLockAtomic | BtLockRead: - WriteLock (latch->atomic); - ReadLock (latch->readwr); - break; } } @@ -1167,12 +1158,6 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch) WriteRelease (latch->parent); break; case BtLockAtomic: - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); - WriteRelease (latch->atomic); - break; - case BtLockAtomic | BtLockRead: - ReadRelease (latch->readwr); - memset (&latch->atomictid, 0, sizeof(latch->atomictid)); WriteRelease (latch->atomic); break; } @@ -1293,20 +1278,10 @@ uint mode, prevmode; prevpage = 0; } - // skip Atomic lock on leaf page we need to slide over - - if( !drill ) - if( mode & BtLockAtomic ) - if( pthread_equal (set->latch->atomictid, pthread_self()) ) - mode &= ~BtLockAtomic; - // obtain mode lock using lock chaining through AccessLock bt_lockpage(mode, set->latch); - if( mode & BtLockAtomic ) - set->latch->atomictid = pthread_self(); - if( set->page->free ) return bt->err = BTERR_struct, 0; @@ -1342,11 +1317,13 @@ uint mode, prevmode; if( drill == lvl ) return slot; + // find next non-dead slot -- the fence key if nothing else + while( slotptr(set->page, slot)->dead ) if( slot++ < set->page->cnt ) - continue; + continue; else - goto slideright; + return bt->err = BTERR_struct, 0; page_no = bt_getid(valptr(set->page, slot)->value); drill--; @@ -2258,11 +2235,87 @@ typedef struct { typedef struct { uid page_no; // page number for split leaf void *next; // next key to insert - uint entry:30; // latch table entry number + uint entry:29; // latch table entry number uint type:2; // 0 == insert, 1 == delete, 2 == free + uint nounlock:1; // don't unlock ParentModification unsigned char leafkey[BT_keyarray]; } AtomicKey; +// find and load leaf page for given key +// leave page Atomic locked and Read locked. + +int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len) +{ +BtLatchSet *prevlatch; +uid page_no; +uint slot; + + // find level zero page + + if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) ) + return 0; + + // find next non-dead entry on this page + // it will be the fence key if nothing else + + while( slotptr(set->page, slot)->dead ) + if( slot++ < set->page->cnt ) + continue; + else + return bt->err = BTERR_struct, 0; + + page_no = bt_getid(valptr(set->page, slot)->value); + prevlatch = set->latch; + + while( page_no ) { + if( set->latch = bt_pinlatch (bt, page_no, 1) ) + set->page = bt_mappage (bt, set->latch); + else + return 0; + + if( set->page->free || set->page->lvl ) + return bt->err = BTERR_struct, 0; + + // obtain read lock using lock chaining with Access mode + // release & unpin parent/left sibling page + + bt_lockpage(BtLockAccess, set->latch); + + bt_unlockpage(BtLockRead, prevlatch); + bt_unpinlatch (prevlatch); + + bt_lockpage(BtLockRead, set->latch); + bt_unlockpage(BtLockAccess, set->latch); + + // find key on page at this level + // and descend to requested level + + if( !set->page->kill ) + if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) { + bt_unlockpage(BtLockRead, set->latch); + bt_lockpage(BtLockAtomic, set->latch); + bt_lockpage(BtLockAccess, set->latch); + bt_lockpage(BtLockRead, set->latch); + bt_unlockpage(BtLockAccess, set->latch); + + if( slot = bt_findslot (set->page, key, len) ) + return slot; + + bt_unlockpage(BtLockAtomic, set->latch); + } + + // slide right into next page + + page_no = bt_getid(set->page->right); + prevlatch = set->latch; + } + + // return error on end of right chain + + bt->err = BTERR_struct; + return 0; // return error +} + // determine actual page where key is located // return slot number @@ -2352,6 +2405,18 @@ BtVal *val; return 0; } +uint bt_parentmatch (AtomicKey *head, uint entry) +{ +uint cnt = 0; + + if( head ) + do if( head->entry == entry ) + head->nounlock = 1, cnt++; + while( head = head->next ); + + return cnt; +} + // atomic modification of a batch of keys. // return -1 if BTERR is set @@ -2415,7 +2480,7 @@ int type; bt_unlockpage(BtLockRead, set->latch); if( !slot ) - if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) ) + if( slot = bt_atomicload(bt, set, key->key, key->len) ) set->latch->split = 0; else return -1; @@ -2551,12 +2616,11 @@ int type; // schedule prev fence key update ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 0; if( tail ) @@ -2596,12 +2660,11 @@ int type; // Process last page split in chain ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 0; if( tail ) @@ -2632,12 +2695,11 @@ int type; // Delete empty master's fence ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 1; if( tail ) @@ -2650,14 +2712,13 @@ int type; bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); + // grab master's right sibling + if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) set->page = bt_mappage (bt, set->latch); else return -1; - // add page to our transaction - - bt_lockpage(BtLockAtomic, set->latch); bt_lockpage(BtLockWrite, set->latch); // pull contents over empty page @@ -2669,18 +2730,24 @@ int type; set->latch->dirty = 1; set->page->kill = 1; - // add new parent key for new master page contents - // delete it after parent posts the new master fence. + // add new fence key for new master page contents, delete + // right sibling after parent posts the new master fence. ptr = keyptr(set->page,set->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = set->latch->entry; - leaf->next = NULL; leaf->type = 2; + // see if right sibling is not yet in the FIFO + + if( !bt_parentmatch (head, leaf->entry) ) + bt_lockpage(BtLockParent, set->latch); + + bt_unlockpage(BtLockWrite, set->latch); + if( tail ) tail->next = leaf; else @@ -2688,9 +2755,6 @@ int type; tail = leaf; -// bt_lockpage(BtLockParent, set->latch); - bt_unlockpage(BtLockWrite, set->latch); - // fix master's far right sibling's left pointer if( right = bt_getid (set->page->right) ) { @@ -2723,27 +2787,27 @@ int type; if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) return -1; - bt_unlockpage (BtLockParent, set->latch); break; case 1: // delete key if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) return -1; - bt_unlockpage (BtLockParent, set->latch); break; case 2: // insert key & free if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) return -1; - bt_unlockpage (BtLockAtomic, set->latch); bt_lockpage (BtLockDelete, set->latch); bt_lockpage (BtLockWrite, set->latch); bt_freepage (bt, set); break; } + if( !leaf->nounlock ) + bt_unlockpage (BtLockParent, set->latch); + bt_unpinlatch (set->latch); tail = leaf->next; free (leaf); @@ -3115,7 +3179,7 @@ FILE *in; nxt -= 1; txn[nxt] = 10; slotptr(page,++cnt)->off = nxt; - slotptr(page,cnt)->type = Unique; + slotptr(page,cnt)->type = Delete; len = 0; if( cnt < args->num )