From 077c0aa4f70ea6c8acceb715de0a7c7d28b32c73 Mon Sep 17 00:00:00 2001 From: unknown Date: Sat, 27 Sep 2014 03:48:03 -0700 Subject: [PATCH] clean up atomic delete deadlock problem --- threadskv8.c | 57 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/threadskv8.c b/threadskv8.c index 574bbd9..f2a23c7 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -184,7 +184,8 @@ typedef enum { Unique, Librarian, Duplicate, - Delete + Delete, + Update } BtSlotType; typedef struct { @@ -2234,8 +2235,9 @@ typedef struct { typedef struct { uid page_no; // page number for split leaf void *next; // next key to insert - uint entry:30; // latch table entry number + uint entry:29; // latch table entry number uint type:2; // 0 == insert, 1 == delete, 2 == free + uint nounlock:1; // don't unlock ParentModification unsigned char leafkey[BT_keyarray]; } AtomicKey; @@ -2403,6 +2405,18 @@ BtVal *val; return 0; } +uint bt_parentmatch (AtomicKey *head, uint entry) +{ +uint cnt = 0; + + if( head ) + do if( head->entry == entry ) + head->nounlock = 1, cnt++; + while( head = head->next ); + + return cnt; +} + // atomic modification of a batch of keys. // return -1 if BTERR is set @@ -2602,12 +2616,11 @@ int type; // schedule prev fence key update ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 0; if( tail ) @@ -2647,12 +2660,11 @@ int type; // Process last page split in chain ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 0; if( tail ) @@ -2683,12 +2695,11 @@ int type; // Delete empty master's fence ptr = keyptr(prev->page,prev->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; - leaf->next = NULL; leaf->type = 1; if( tail ) @@ -2701,14 +2712,13 @@ int type; bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); + // grab master's right sibling + if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) set->page = bt_mappage (bt, set->latch); else return -1; - // add page to our transaction - - bt_lockpage(BtLockAtomic, set->latch); bt_lockpage(BtLockWrite, set->latch); // pull contents over empty page @@ -2720,18 +2730,24 @@ int type; set->latch->dirty = 1; set->page->kill = 1; - // add new parent key for new master page contents - // delete it after parent posts the new master fence. + // add new fence key for new master page contents, delete + // right sibling after parent posts the new master fence. ptr = keyptr(set->page,set->page->cnt); - leaf = malloc (sizeof(AtomicKey)); + leaf = calloc (sizeof(AtomicKey), 1); memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); leaf->page_no = prev->latch->page_no; leaf->entry = set->latch->entry; - leaf->next = NULL; leaf->type = 2; + // see if right sibling is not yet in the FIFO + + if( !bt_parentmatch (head, leaf->entry) ) + bt_lockpage(BtLockParent, set->latch); + + bt_unlockpage(BtLockWrite, set->latch); + if( tail ) tail->next = leaf; else @@ -2739,9 +2755,6 @@ int type; tail = leaf; -// bt_lockpage(BtLockParent, set->latch); - bt_unlockpage(BtLockWrite, set->latch); - // fix master's far right sibling's left pointer if( right = bt_getid (set->page->right) ) { @@ -2774,27 +2787,27 @@ int type; if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) return -1; - bt_unlockpage (BtLockParent, set->latch); break; case 1: // delete key if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) return -1; - bt_unlockpage (BtLockParent, set->latch); break; case 2: // insert key & free if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) return -1; - bt_unlockpage (BtLockAtomic, set->latch); bt_lockpage (BtLockDelete, set->latch); bt_lockpage (BtLockWrite, set->latch); bt_freepage (bt, set); break; } + if( !leaf->nounlock ) + bt_unlockpage (BtLockParent, set->latch); + bt_unpinlatch (set->latch); tail = leaf->next; free (leaf); @@ -3166,7 +3179,7 @@ FILE *in; nxt -= 1; txn[nxt] = 10; slotptr(page,++cnt)->off = nxt; - slotptr(page,cnt)->type = Unique; + slotptr(page,cnt)->type = Delete; len = 0; if( cnt < args->num ) -- 2.40.0