From: unknown Date: Fri, 26 Sep 2014 21:35:42 +0000 (-0700) Subject: Fix bt_atomictxn to process delete key requests properly X-Git-Url: https://pd.if.org/git/?p=btree;a=commitdiff_plain;h=3ba73f07ec76f27a3b97fb59fd838e4b129f2432 Fix bt_atomictxn to process delete key requests properly --- diff --git a/threadskv8.c b/threadskv8.c index 3bae89e..44793a0 100644 --- a/threadskv8.c +++ b/threadskv8.c @@ -3,10 +3,11 @@ // phase-fair reader writer lock, // librarian page split code, // duplicate key management +// bi-directional cursors // traditional buffer pool manager -// and ACID batched key updates +// and ACID batched key-value updates -// 21 SEP 2014 +// 26 SEP 2014 // author: karl malbrain, malbrain@cal.berkeley.edu @@ -235,6 +236,8 @@ typedef struct BtPage_ { unsigned char lvl:7; // level of page unsigned char kill:1; // page is being deleted unsigned char right[BtId]; // page number to right + unsigned char left[BtId]; // page number to left + unsigned char filler[2]; // padding to multiple of 8 } *BtPage; // The loadpage interface object @@ -977,6 +980,11 @@ BtVal *val; pagezero->alloc->bits = mgr->page_bits; bt_putid(pagezero->alloc->right, MIN_lvl+1); + // initialize left-most LEAF page in + // alloc->left. + + bt_putid (pagezero->alloc->left, LEAF_page); + if( bt_writepage (mgr, pagezero->alloc, 0) ) { fprintf (stderr, "Unable to create btree page zero\n"); return bt_mgrclose (mgr), NULL; @@ -1277,7 +1285,7 @@ uint mode, prevmode; set->page = bt_mappage (bt, set->latch); - // release & unpin parent page + // release & unpin parent or left sibling page if( prevpage ) { bt_unlockpage(prevmode, prevlatch); @@ -1285,7 +1293,7 @@ uint mode, prevmode; prevpage = 0; } - // skip Atomic lock on leaf page if already held + // skip Atomic lock on leaf page we need to slide over if( !drill ) if( mode & BtLockAtomic ) @@ -1378,7 +1386,6 @@ void bt_freepage (BtDb *bt, BtPageSet *set) bt_unlockpage (BtLockDelete, set->latch); bt_unlockpage (BtLockWrite, set->latch); - bt_unpinlatch (set->latch); // unlock allocation page @@ -1460,6 +1467,7 @@ uint idx; root->latch->dirty = 1; bt_freepage (bt, child); + bt_unpinlatch (child->latch); } while( root->page->lvl > 1 && root->page->act == 1 ); @@ -1546,6 +1554,7 @@ BtKey *ptr; bt_lockpage (BtLockDelete, right->latch); bt_lockpage (BtLockWrite, right->latch); bt_freepage (bt, right); + bt_unpinlatch (right->latch); bt_unlockpage (BtLockParent, set->latch); bt_unpinlatch (set->latch); @@ -2242,20 +2251,20 @@ uint type; typedef struct { uint entry; // latch table entry number - uint slot:30; // page slot number + uint slot:31; // page slot number uint reuse:1; // reused previous page - uint emptied:1; // page was emptied } AtomicMod; typedef struct { uid page_no; // page number for split leaf void *next; // next key to insert - uint entry; // latch table entry number + uint entry:30; // latch table entry number + uint type:2; // 0 == insert, 1 == delete, 2 == free unsigned char leafkey[BT_keyarray]; } AtomicKey; // determine actual page where key is located -// return slot number with set page locked +// return slot number uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set) { @@ -2338,31 +2347,11 @@ BtVal *val; val = valptr(set->page, slot); set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal); - set->page->act--; - - // collapse empty slots beneath the fence - - while( idx = set->page->cnt - 1 ) - if( slotptr(set->page, idx)->dead ) { - *slotptr(set->page, idx) = *slotptr(set->page, idx + 1); - memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot)); - } else - break; - set->latch->dirty = 1; + set->page->act--; return 0; } -// qsort comparison function - -int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page) -{ -BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off); -BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off); - - return keycmp (key1, key2->key, key2->len); -} - // atomic modification of a batch of keys. // return -1 if BTERR is set @@ -2370,9 +2359,9 @@ BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off); // causing the key constraint violation // or zero on successful completion. -int bt_atomicmods (BtDb *bt, BtPage source) +int bt_atomictxn (BtDb *bt, BtPage source) { -uint src, idx, slot, samepage, entry, next, split; +uint src, idx, slot, samepage, entry; AtomicKey *head, *tail, *leaf; BtPageSet set[1], prev[1]; unsigned char value[BtId]; @@ -2383,16 +2372,16 @@ int result = 0; BtSlot temp[1]; BtPage page; BtVal *val; +uid right; int type; locks = calloc (source->cnt + 1, sizeof(AtomicMod)); head = NULL; tail = NULL; - // first sort the list of keys into order to + // stable sort the list of keys into order to // prevent deadlocks between threads. - qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source); -/* + for( src = 1; src++ < source->cnt; ) { *temp = *slotptr(source,src); key = keyptr (source,src); @@ -2406,13 +2395,13 @@ int type; break; } } -*/ - // Load the leafpage for each key + + // Load the leaf page for each key + // group same page references with reuse bit // and determine any constraint violations for( src = 0; src++ < source->cnt; ) { key = keyptr(source, src); - val = valptr(source, src); slot = 0; // first determine if this modification falls @@ -2420,7 +2409,7 @@ int type; // note that the far right leaf page is a special case if( samepage = src > 1 ) - if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 ) + if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 ) slot = bt_findslot(set->page, key->key, key->len); else // release read on previous page bt_unlockpage(BtLockRead, set->latch); @@ -2451,50 +2440,26 @@ int type; case Unique: if( !slotptr(set->page, slot)->dead ) if( slot < set->page->cnt || bt_getid (set->page->right) ) - if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) { + if( !keycmp (ptr, key->key, key->len) ) { // return constraint violation if key already exists + bt_unlockpage(BtLockRead, set->latch); result = src; while( src ) { if( locks[src].entry ) { set->latch = bt->mgr->latchsets + locks[src].entry; bt_unlockpage(BtLockAtomic, set->latch); - bt_unlockpage(BtLockRead, set->latch); bt_unpinlatch (set->latch); } src--; } - + free (locks); return result; } - break; - - case Delete: - if( !slotptr(set->page, slot)->dead ) - if( ptr->len == key->len ) - if( !memcmp (ptr->key, key->key, key->len) ) - break; - - // Key to delete doesn't exist - - result = src; - - while( src ) { - if( locks[src].entry ) { - set->latch = bt->mgr->latchsets + locks[src].entry; - bt_unlockpage(BtLockAtomic, set->latch); - bt_unlockpage(BtLockRead, set->latch); - bt_unpinlatch (set->latch); - } - src--; - } - - return result; } - } // unlock last loadpage lock @@ -2502,69 +2467,88 @@ int type; if( source->cnt > 1 ) bt_unlockpage(BtLockRead, set->latch); - // obtain write lock for each page + // obtain write lock for each master page for( src = 0; src++ < source->cnt; ) - if( locks[src].entry ) + if( locks[src].reuse ) + continue; + else bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry); // insert or delete each key + // process any splits or merges + // release Write & Atomic latches + // set ParentModifications and build + // queue of keys to insert for split pages + // or delete for deleted pages. - for( src = 0; src++ < source->cnt; ) - if( slotptr(source,src)->type == Delete ) - if( bt_atomicdelete (bt, source, locks, src) ) - return -1; - else - continue; - else - if( bt_atomicinsert (bt, source, locks, src) ) - return -1; - else - continue; + // run through txn list backwards - // set ParentModification and release WriteLock - // leave empty pages locked for later processing - // build queue of keys to insert for split pages + samepage = source->cnt + 1; - for( src = 0; src++ < source->cnt; ) { + for( src = source->cnt; src; src-- ) { if( locks[src].reuse ) continue; - prev->latch = bt->mgr->latchsets + locks[src].entry; - prev->page = bt_mappage (bt, prev->latch); + // perform the txn operations + // from smaller to larger on + // the same page + + for( idx = src; idx < samepage; idx++ ) + switch( slotptr(source,idx)->type ) { + case Delete: + if( bt_atomicdelete (bt, source, locks, idx) ) + return -1; + break; - // pick-up all splits from original page + case Duplicate: + case Unique: + if( bt_atomicinsert (bt, source, locks, idx) ) + return -1; + break; + } + + // after the same page operations have finished, + // process master page for splits or deletion. - split = next = prev->latch->split; + latch = prev->latch = bt->mgr->latchsets + locks[src].entry; + prev->page = bt_mappage (bt, prev->latch); + samepage = src; - if( !prev->page->act ) - locks[src].emptied = 1; + // pick-up all splits from master page - while( entry = next ) { + entry = prev->latch->split; + + while( entry ) { set->latch = bt->mgr->latchsets + entry; set->page = bt_mappage (bt, set->latch); - next = set->latch->split; - set->latch->split = 0; + entry = set->latch->split; - // delete empty previous page + // delete empty master page if( !prev->page->act ) { + memcpy (set->page->left, prev->page->left, BtId); memcpy (prev->page, set->page, bt->mgr->page_size); bt_lockpage (BtLockDelete, set->latch); bt_freepage (bt, set); + bt_unpinlatch (set->latch); prev->latch->dirty = 1; + continue; + } - if( prev->page->act ) - locks[src].emptied = 0; + // remove empty page from the split chain + if( !set->page->act ) { + memcpy (prev->page->right, set->page->right, BtId); + prev->latch->split = set->latch->split; + bt_lockpage (BtLockDelete, set->latch); + bt_freepage (bt, set); + bt_unpinlatch (set->latch); continue; } - // prev page is not emptied - locks[src].emptied = 0; - - // schedule previous fence key update + // schedule prev fence key update ptr = keyptr(prev->page,prev->page->cnt); leaf = malloc (sizeof(AtomicKey)); @@ -2573,6 +2557,7 @@ int type; leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; leaf->next = NULL; + leaf->type = 0; if( tail ) tail->next = leaf; @@ -2581,31 +2566,70 @@ int type; tail = leaf; - // remove empty block from the split chain - - if( !set->page->act ) { - memcpy (prev->page->right, set->page->right, BtId); - bt_lockpage (BtLockDelete, set->latch); - bt_freepage (bt, set); - continue; - } - + bt_putid (set->page->left, prev->latch->page_no); bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); *prev = *set; } - // was entire chain emptied? + // update left pointer in next right page + // if we did any now non-empty splits - if( !prev->page->act ) - continue; + if( latch->split ) { + // fix left pointer in master's original right sibling + // or set rightmost page in page zero - if( !split ) { - bt_unlockpage(BtLockWrite, prev->latch); - continue; + if( right = bt_getid (prev->page->right) ) { + if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) ) + set->page = bt_mappage (bt, set->latch); + else + return -1; + + bt_lockpage (BtLockWrite, set->latch); + bt_putid (set->page->left, prev->latch->page_no); + set->latch->dirty = 1; + bt_unlockpage (BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + } else + bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no); + + // Process last page split in chain + + ptr = keyptr(prev->page,prev->page->cnt); + leaf = malloc (sizeof(AtomicKey)); + + memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->page_no = prev->latch->page_no; + leaf->entry = prev->latch->entry; + leaf->next = NULL; + leaf->type = 0; + + if( tail ) + tail->next = leaf; + else + head = leaf; + + tail = leaf; + + bt_lockpage(BtLockParent, prev->latch); + bt_unlockpage(BtLockWrite, prev->latch); + bt_unlockpage(BtLockAtomic, latch); + continue; } - // Process last page split in chain + // finished if prev page occupied (either master or final split) + + if( prev->page->act ) { + bt_unlockpage(BtLockWrite, latch); + bt_unlockpage(BtLockAtomic, latch); + bt_unpinlatch(latch); + continue; + } + + // any splits were reversed, and the + // master page located in prev is empty, delete it + // by pulling over master's right sibling. + // Delete empty master's fence ptr = keyptr(prev->page,prev->page->cnt); leaf = malloc (sizeof(AtomicKey)); @@ -2614,7 +2638,8 @@ int type; leaf->page_no = prev->latch->page_no; leaf->entry = prev->latch->entry; leaf->next = NULL; - + leaf->type = 1; + if( tail ) tail->next = leaf; else @@ -2624,48 +2649,102 @@ int type; bt_lockpage(BtLockParent, prev->latch); bt_unlockpage(BtLockWrite, prev->latch); - } + + if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) ) + set->page = bt_mappage (bt, set->latch); + else + return -1; + + // add page to our transaction + + bt_lockpage(BtLockAtomic, set->latch); + bt_lockpage(BtLockWrite, set->latch); + + // pull contents over empty page + + memcpy (set->page->left, prev->page->left, BtId); + memcpy (prev->page, set->page, bt->mgr->page_size); + + bt_putid (set->page->right, prev->latch->page_no); + set->latch->dirty = 1; + set->page->kill = 1; + + // add new parent key for new master page contents + // delete it after parent posts the new master fence. + + ptr = keyptr(set->page,set->page->cnt); + leaf = malloc (sizeof(AtomicKey)); + + memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey)); + leaf->page_no = prev->latch->page_no; + leaf->entry = set->latch->entry; + leaf->next = NULL; + leaf->type = 2; - // remove Atomic locks and - // process any empty pages + if( tail ) + tail->next = leaf; + else + head = leaf; - for( src = source->cnt; src; src-- ) { - if( locks[src].reuse ) - continue; + tail = leaf; - set->latch = bt->mgr->latchsets + locks[src].entry; - set->page = bt_mappage (bt, set->latch); +// bt_lockpage(BtLockParent, set->latch); + bt_unlockpage(BtLockWrite, set->latch); - // clear original page split field + // fix master's far right sibling's left pointer - split = set->latch->split; - set->latch->split = 0; - bt_unlockpage (BtLockAtomic, set->latch); + if( right = bt_getid (set->page->right) ) { + if( set->latch = bt_pinlatch (bt, right, 1) ) + set->page = bt_mappage (bt, set->latch); - // delete page emptied by our atomic action + bt_lockpage (BtLockWrite, set->latch); + bt_putid (set->page->left, prev->latch->page_no); + set->latch->dirty = 1; - if( locks[src].emptied ) - if( bt_deletepage (bt, set) ) - return bt->err; - else - continue; + bt_unlockpage (BtLockWrite, set->latch); + bt_unpinlatch (set->latch); + } - if( !split ) - bt_unpinlatch (set->latch); + bt_unlockpage(BtLockAtomic, latch); } - - // add keys for any pages split during action + + // add & delete keys for any pages split or merged during transaction if( leaf = head ) do { - ptr = (BtKey *)leaf->leafkey; + set->latch = bt->mgr->latchsets + leaf->entry; + set->page = bt_mappage (bt, set->latch); + bt_putid (value, leaf->page_no); + ptr = (BtKey *)leaf->leafkey; - if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) - return bt->err; + switch( leaf->type ) { + case 0: // insert key + if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + return -1; + + bt_unlockpage (BtLockParent, set->latch); + break; - bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry); - bt_unpinlatch (bt->mgr->latchsets + leaf->entry); + case 1: // delete key + if( bt_deletekey (bt, ptr->key, ptr->len, 1) ) + return -1; + + bt_unlockpage (BtLockParent, set->latch); + break; + + case 2: // insert key & free + if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) ) + return -1; + + bt_unlockpage (BtLockAtomic, set->latch); + bt_lockpage (BtLockDelete, set->latch); + bt_lockpage (BtLockWrite, set->latch); + bt_freepage (bt, set); + break; + } + + bt_unpinlatch (set->latch); tail = leaf->next; free (leaf); } while( leaf = tail ); @@ -2676,6 +2755,69 @@ int type; return 0; } +// set cursor to highest slot on highest page + +uint bt_lastkey (BtDb *bt) +{ +uid page_no = bt_getid (bt->mgr->pagezero->alloc->left); +BtPageSet set[1]; +uint slot; + + if( set->latch = bt_pinlatch (bt, page_no, 1) ) + set->page = bt_mappage (bt, set->latch); + else + return 0; + + bt_lockpage(BtLockRead, set->latch); + + memcpy (bt->cursor, set->page, bt->mgr->page_size); + slot = set->page->cnt; + + bt_unlockpage(BtLockRead, set->latch); + bt_unpinlatch (set->latch); + return slot; +} + +// return previous slot on cursor page + +uint bt_prevkey (BtDb *bt, uint slot) +{ +uid ourright, next, us = bt->cursor_page; +BtPageSet set[1]; + + if( --slot ) + return slot; + + ourright = bt_getid(bt->cursor->right); + +goleft: + if( !(next = bt_getid(bt->cursor->left)) ) + return 0; + +findourself: + bt->cursor_page = next; + + if( set->latch = bt_pinlatch (bt, next, 1) ) + set->page = bt_mappage (bt, set->latch); + else + return 0; + + bt_lockpage(BtLockRead, set->latch); + memcpy (bt->cursor, set->page, bt->mgr->page_size); + bt_unlockpage(BtLockRead, set->latch); + bt_unpinlatch (set->latch); + + next = bt_getid (bt->cursor->right); + + if( next != us ) + if( next == ourright ) + goto goleft; + else + goto findourself; + + return bt->cursor->cnt; +} + // return next slot on cursor page // or slide cursor right into next page @@ -2976,14 +3118,14 @@ FILE *in; slotptr(page,cnt)->type = Unique; len = 0; - if( cnt < 25 ) + if( cnt < args->num ) continue; page->cnt = cnt; page->act = cnt; page->min = nxt; - if( bt_atomicmods (bt, page) ) + if( bt_atomictxn (bt, page) ) fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0); nxt = 65536; cnt = 0;