]> pd.if.org Git - btree/commitdiff
Fix bt_atomictxn to process delete key requests properly
authorunknown <karl@E04.petzent.com>
Fri, 26 Sep 2014 21:35:42 +0000 (14:35 -0700)
committerunknown <karl@E04.petzent.com>
Fri, 26 Sep 2014 21:35:42 +0000 (14:35 -0700)
threadskv8.c

index 3bae89e89b861f8d99806ef2937a5fada4805f80..44793a09d99365c97979c5bd79d44905ff555283 100644 (file)
@@ -3,10 +3,11 @@
 //     phase-fair reader writer lock,
 //     librarian page split code,
 //     duplicate key management
+//     bi-directional cursors
 //     traditional buffer pool manager
-//     and ACID batched key updates
+//     and ACID batched key-value updates
 
-// 21 SEP 2014
+// 26 SEP 2014
 
 // author: karl malbrain, malbrain@cal.berkeley.edu
 
@@ -235,6 +236,8 @@ typedef struct BtPage_ {
        unsigned char lvl:7;            // level of page
        unsigned char kill:1;           // page is being deleted
        unsigned char right[BtId];      // page number to right
+       unsigned char left[BtId];       // page number to left
+       unsigned char filler[2];        // padding to multiple of 8
 } *BtPage;
 
 //  The loadpage interface object
@@ -977,6 +980,11 @@ BtVal *val;
        pagezero->alloc->bits = mgr->page_bits;
        bt_putid(pagezero->alloc->right, MIN_lvl+1);
 
+       //  initialize left-most LEAF page in
+       //      alloc->left.
+
+       bt_putid (pagezero->alloc->left, LEAF_page);
+
        if( bt_writepage (mgr, pagezero->alloc, 0) ) {
                fprintf (stderr, "Unable to create btree page zero\n");
                return bt_mgrclose (mgr), NULL;
@@ -1277,7 +1285,7 @@ uint mode, prevmode;
 
        set->page = bt_mappage (bt, set->latch);
 
-       //      release & unpin parent page
+       //      release & unpin parent or left sibling page
 
        if( prevpage ) {
          bt_unlockpage(prevmode, prevlatch);
@@ -1285,7 +1293,7 @@ uint mode, prevmode;
          prevpage = 0;
        }
 
-       //      skip Atomic lock on leaf page if already held
+       //      skip Atomic lock on leaf page we need to slide over
 
        if( !drill )
         if( mode & BtLockAtomic )
@@ -1378,7 +1386,6 @@ void bt_freepage (BtDb *bt, BtPageSet *set)
 
        bt_unlockpage (BtLockDelete, set->latch);
        bt_unlockpage (BtLockWrite, set->latch);
-       bt_unpinlatch (set->latch);
 
        // unlock allocation page
 
@@ -1460,6 +1467,7 @@ uint idx;
        root->latch->dirty = 1;
 
        bt_freepage (bt, child);
+       bt_unpinlatch (child->latch);
 
   } while( root->page->lvl > 1 && root->page->act == 1 );
 
@@ -1546,6 +1554,7 @@ BtKey *ptr;
        bt_lockpage (BtLockDelete, right->latch);
        bt_lockpage (BtLockWrite, right->latch);
        bt_freepage (bt, right);
+       bt_unpinlatch (right->latch);
 
        bt_unlockpage (BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
@@ -2242,20 +2251,20 @@ uint type;
 
 typedef struct {
        uint entry;                     // latch table entry number
-       uint slot:30;           // page slot number
+       uint slot:31;           // page slot number
        uint reuse:1;           // reused previous page
-       uint emptied:1;         // page was emptied
 } AtomicMod;
 
 typedef struct {
        uid page_no;            // page number for split leaf
        void *next;                     // next key to insert
-       uint entry;                     // latch table entry number
+       uint entry:30;          // latch table entry number
+       uint type:2;            // 0 == insert, 1 == delete, 2 == free
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
 //     determine actual page where key is located
-//  return slot number with set page locked
+//  return slot number
 
 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
 {
@@ -2338,31 +2347,11 @@ BtVal *val;
        val = valptr(set->page, slot);
 
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
-       set->page->act--;
-
-       // collapse empty slots beneath the fence
-
-       while( idx = set->page->cnt - 1 )
-         if( slotptr(set->page, idx)->dead ) {
-               *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
-               memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
-         } else
-               break;
-
        set->latch->dirty = 1;
+       set->page->act--;
        return 0;
 }
 
-//     qsort comparison function
-
-int bt_qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
-{
-BtKey *key1 = (BtKey *)((unsigned char *)page + slot1->off);
-BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
-
-       return keycmp (key1, key2->key, key2->len);
-}
-
 //     atomic modification of a batch of keys.
 
 //     return -1 if BTERR is set
@@ -2370,9 +2359,9 @@ BtKey *key2 = (BtKey *)((unsigned char *)page + slot2->off);
 //     causing the key constraint violation
 //     or zero on successful completion.
 
-int bt_atomicmods (BtDb *bt, BtPage source)
+int bt_atomictxn (BtDb *bt, BtPage source)
 {
-uint src, idx, slot, samepage, entry, next, split;
+uint src, idx, slot, samepage, entry;
 AtomicKey *head, *tail, *leaf;
 BtPageSet set[1], prev[1];
 unsigned char value[BtId];
@@ -2383,16 +2372,16 @@ int result = 0;
 BtSlot temp[1];
 BtPage page;
 BtVal *val;
+uid right;
 int type;
 
   locks = calloc (source->cnt + 1, sizeof(AtomicMod));
   head = NULL;
   tail = NULL;
 
-  // first sort the list of keys into order to
+  // stable sort the list of keys into order to
   //   prevent deadlocks between threads.
-  qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)bt_qsortcmp, source);
-/*
+
   for( src = 1; src++ < source->cnt; ) {
        *temp = *slotptr(source,src);
        key = keyptr (source,src);
@@ -2406,13 +2395,13 @@ int type;
                break;
        }
   }
-*/
-  // Load the leafpage for each key
+
+  // Load the leaf page for each key
+  // group same page references with reuse bit
   // and determine any constraint violations
 
   for( src = 0; src++ < source->cnt; ) {
        key = keyptr(source, src);
-       val = valptr(source, src);
        slot = 0;
 
        // first determine if this modification falls
@@ -2420,7 +2409,7 @@ int type;
        //      note that the far right leaf page is a special case
 
        if( samepage = src > 1 )
-         if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
+         if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
          else // release read on previous page
                bt_unlockpage(BtLockRead, set->latch); 
@@ -2451,50 +2440,26 @@ int type;
        case Unique:
          if( !slotptr(set->page, slot)->dead )
           if( slot < set->page->cnt || bt_getid (set->page->right) )
-           if( ptr->len == key->len && !memcmp (ptr->key, key->key, key->len) ) {
+           if( !keycmp (ptr, key->key, key->len) ) {
 
                  // return constraint violation if key already exists
 
+                 bt_unlockpage(BtLockRead, set->latch);
                  result = src;
 
                  while( src ) {
                        if( locks[src].entry ) {
                          set->latch = bt->mgr->latchsets + locks[src].entry;
                          bt_unlockpage(BtLockAtomic, set->latch);
-                         bt_unlockpage(BtLockRead, set->latch);
                          bt_unpinlatch (set->latch);
                        }
                        src--;
                  }
-
+                 free (locks);
                  return result;
                }
-
          break;
-
-       case Delete:
-         if( !slotptr(set->page, slot)->dead )
-          if( ptr->len == key->len )
-               if( !memcmp (ptr->key, key->key, key->len) )
-             break;
-
-         //  Key to delete doesn't exist
-
-         result = src;
-
-         while( src ) {
-               if( locks[src].entry ) {
-                 set->latch = bt->mgr->latchsets + locks[src].entry;
-                 bt_unlockpage(BtLockAtomic, set->latch);
-                 bt_unlockpage(BtLockRead, set->latch);
-                 bt_unpinlatch (set->latch);
-               }
-               src--;
-         }
-
-         return result;
        }
-
   }
 
   //  unlock last loadpage lock
@@ -2502,69 +2467,88 @@ int type;
   if( source->cnt > 1 )
        bt_unlockpage(BtLockRead, set->latch);
 
-  //  obtain write lock for each page
+  //  obtain write lock for each master page
 
   for( src = 0; src++ < source->cnt; )
-       if( locks[src].entry )
+       if( locks[src].reuse )
+         continue;
+       else
          bt_lockpage(BtLockWrite, bt->mgr->latchsets + locks[src].entry);
 
   // insert or delete each key
+  // process any splits or merges
+  // release Write & Atomic latches
+  // set ParentModifications and build
+  // queue of keys to insert for split pages
+  // or delete for deleted pages.
 
-  for( src = 0; src++ < source->cnt; )
-       if( slotptr(source,src)->type == Delete )
-         if( bt_atomicdelete (bt, source, locks, src) )
-               return -1;
-         else
-               continue;
-       else
-         if( bt_atomicinsert (bt, source, locks, src) )
-               return -1;
-         else
-               continue;
+  // run through txn list backwards
 
-  // set ParentModification and release WriteLock
-  // leave empty pages locked for later processing
-  // build queue of keys to insert for split pages
+  samepage = source->cnt + 1;
 
-  for( src = 0; src++ < source->cnt; ) {
+  for( src = source->cnt; src; src-- ) {
        if( locks[src].reuse )
          continue;
 
-       prev->latch = bt->mgr->latchsets + locks[src].entry;
-       prev->page = bt_mappage (bt, prev->latch);
+       //  perform the txn operations
+       //      from smaller to larger on
+       //  the same page
+
+       for( idx = src; idx < samepage; idx++ )
+        switch( slotptr(source,idx)->type ) {
+        case Delete:
+         if( bt_atomicdelete (bt, source, locks, idx) )
+               return -1;
+         break;
 
-       //  pick-up all splits from original page
+       case Duplicate:
+       case Unique:
+         if( bt_atomicinsert (bt, source, locks, idx) )
+               return -1;
+         break;
+       }
+
+       //      after the same page operations have finished,
+       //  process master page for splits or deletion.
 
-       split = next = prev->latch->split;
+       latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
+       prev->page = bt_mappage (bt, prev->latch);
+       samepage = src;
 
-       if( !prev->page->act )
-         locks[src].emptied = 1;
+       //  pick-up all splits from master page
 
-       while( entry = next ) {
+       entry = prev->latch->split;
+
+       while( entry ) {
          set->latch = bt->mgr->latchsets + entry;
          set->page = bt_mappage (bt, set->latch);
-         next = set->latch->split;
-         set->latch->split = 0;
+         entry = set->latch->split;
 
-         // delete empty previous page
+         // delete empty master page
 
          if( !prev->page->act ) {
+               memcpy (set->page->left, prev->page->left, BtId);
                memcpy (prev->page, set->page, bt->mgr->page_size);
                bt_lockpage (BtLockDelete, set->latch);
                bt_freepage (bt, set);
+               bt_unpinlatch (set->latch);
 
                prev->latch->dirty = 1;
+               continue;
+         }
 
-               if( prev->page->act )
-                 locks[src].emptied = 0;
+         // remove empty page from the split chain
 
+         if( !set->page->act ) {
+               memcpy (prev->page->right, set->page->right, BtId);
+               prev->latch->split = set->latch->split;
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_freepage (bt, set);
+               bt_unpinlatch (set->latch);
                continue;
          }
 
-         // prev page is not emptied 
-         locks[src].emptied = 0;
-
-         //  schedule previous fence key update
+         //  schedule prev fence key update
 
          ptr = keyptr(prev->page,prev->page->cnt);
          leaf = malloc (sizeof(AtomicKey));
@@ -2573,6 +2557,7 @@ int type;
          leaf->page_no = prev->latch->page_no;
          leaf->entry = prev->latch->entry;
          leaf->next = NULL;
+         leaf->type = 0;
 
          if( tail )
                tail->next = leaf;
@@ -2581,31 +2566,70 @@ int type;
 
          tail = leaf;
 
-         // remove empty block from the split chain
-
-         if( !set->page->act ) {
-               memcpy (prev->page->right, set->page->right, BtId);
-               bt_lockpage (BtLockDelete, set->latch);
-               bt_freepage (bt, set);
-               continue;
-         }
-
+         bt_putid (set->page->left, prev->latch->page_no);
          bt_lockpage(BtLockParent, prev->latch);
          bt_unlockpage(BtLockWrite, prev->latch);
          *prev = *set;
        }
 
-       //  was entire chain emptied?
+       //  update left pointer in next right page
+       //      if we did any now non-empty splits
 
-       if( !prev->page->act )
-               continue;
+       if( latch->split ) {
+         //  fix left pointer in master's original right sibling
+         //    or set rightmost page in page zero
 
-       if( !split ) {
-               bt_unlockpage(BtLockWrite, prev->latch);
-               continue;
+         if( right = bt_getid (prev->page->right) ) {
+               if( set->latch = bt_pinlatch (bt, bt_getid(prev->page->right), 1) )
+                 set->page = bt_mappage (bt, set->latch);
+               else
+                 return -1;
+
+           bt_lockpage (BtLockWrite, set->latch);
+           bt_putid (set->page->left, prev->latch->page_no);
+               set->latch->dirty = 1;
+           bt_unlockpage (BtLockWrite, set->latch);
+               bt_unpinlatch (set->latch);
+         } else
+               bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+
+         //  Process last page split in chain
+
+         ptr = keyptr(prev->page,prev->page->cnt);
+         leaf = malloc (sizeof(AtomicKey));
+
+         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+         leaf->page_no = prev->latch->page_no;
+         leaf->entry = prev->latch->entry;
+         leaf->next = NULL;
+         leaf->type = 0;
+  
+         if( tail )
+               tail->next = leaf;
+         else
+               head = leaf;
+
+         tail = leaf;
+
+         bt_lockpage(BtLockParent, prev->latch);
+         bt_unlockpage(BtLockWrite, prev->latch);
+         bt_unlockpage(BtLockAtomic, latch);
+         continue;
        }
 
-       //      Process last page split in chain
+       //  finished if prev page occupied (either master or final split)
+
+       if( prev->page->act ) {
+         bt_unlockpage(BtLockWrite, latch);
+         bt_unlockpage(BtLockAtomic, latch);
+         bt_unpinlatch(latch);
+         continue;
+       }
+
+       // any splits were reversed, and the
+       // master page located in prev is empty, delete it
+       // by pulling over master's right sibling.
+       // Delete empty master's fence
 
        ptr = keyptr(prev->page,prev->page->cnt);
        leaf = malloc (sizeof(AtomicKey));
@@ -2614,7 +2638,8 @@ int type;
        leaf->page_no = prev->latch->page_no;
        leaf->entry = prev->latch->entry;
        leaf->next = NULL;
-
+       leaf->type = 1;
+  
        if( tail )
          tail->next = leaf;
        else
@@ -2624,48 +2649,102 @@ int type;
 
        bt_lockpage(BtLockParent, prev->latch);
        bt_unlockpage(BtLockWrite, prev->latch);
-  }
+
+       if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return -1;
+
+       //      add page to our transaction
+
+       bt_lockpage(BtLockAtomic, set->latch);
+       bt_lockpage(BtLockWrite, set->latch);
+
+       //      pull contents over empty page
+
+       memcpy (set->page->left, prev->page->left, BtId);
+       memcpy (prev->page, set->page, bt->mgr->page_size);
+
+       bt_putid (set->page->right, prev->latch->page_no);
+       set->latch->dirty = 1;
+       set->page->kill = 1;
+
+       //      add new parent key for new master page contents
+       //      delete it after parent posts the new master fence.
+
+       ptr = keyptr(set->page,set->page->cnt);
+       leaf = malloc (sizeof(AtomicKey));
+
+       memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
+       leaf->page_no = prev->latch->page_no;
+       leaf->entry = set->latch->entry;
+       leaf->next = NULL;
+       leaf->type = 2;
   
-  // remove Atomic locks and
-  // process any empty pages
+       if( tail )
+         tail->next = leaf;
+       else
+         head = leaf;
 
-  for( src = source->cnt; src; src-- ) {
-       if( locks[src].reuse )
-         continue;
+       tail = leaf;
 
-       set->latch = bt->mgr->latchsets + locks[src].entry;
-       set->page = bt_mappage (bt, set->latch);
+//     bt_lockpage(BtLockParent, set->latch);
+       bt_unlockpage(BtLockWrite, set->latch);
 
-       //  clear original page split field
+       //  fix master's far right sibling's left pointer
 
-       split = set->latch->split;
-       set->latch->split = 0;
-       bt_unlockpage (BtLockAtomic, set->latch);
+       if( right = bt_getid (set->page->right) ) {
+         if( set->latch = bt_pinlatch (bt, right, 1) )
+               set->page = bt_mappage (bt, set->latch);
 
-       //  delete page emptied by our atomic action
+         bt_lockpage (BtLockWrite, set->latch);
+         bt_putid (set->page->left, prev->latch->page_no);
+         set->latch->dirty = 1;
 
-       if( locks[src].emptied )
-         if( bt_deletepage (bt, set) )
-               return bt->err;
-         else
-               continue;
+         bt_unlockpage (BtLockWrite, set->latch);
+         bt_unpinlatch (set->latch);
+       }
 
-       if( !split )
-               bt_unpinlatch (set->latch);
+       bt_unlockpage(BtLockAtomic, latch);
   }
-
-  //  add keys for any pages split during action
+  
+  //  add & delete keys for any pages split or merged during transaction
 
   if( leaf = head )
     do {
-         ptr = (BtKey *)leaf->leafkey;
+         set->latch = bt->mgr->latchsets + leaf->entry;
+         set->page = bt_mappage (bt, set->latch);
+
          bt_putid (value, leaf->page_no);
+         ptr = (BtKey *)leaf->leafkey;
 
-         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
-               return bt->err;
+         switch( leaf->type ) {
+         case 0:       // insert key
+           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+                 return -1;
+
+           bt_unlockpage (BtLockParent, set->latch);
+               break;
 
-         bt_unlockpage (BtLockParent, bt->mgr->latchsets + leaf->entry);
-         bt_unpinlatch (bt->mgr->latchsets + leaf->entry);
+         case 1:       // delete key
+               if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
+                 return -1;
+
+           bt_unlockpage (BtLockParent, set->latch);
+               break;
+
+         case 2:       // insert key & free
+           if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
+                 return -1;
+
+               bt_unlockpage (BtLockAtomic, set->latch);
+               bt_lockpage (BtLockDelete, set->latch);
+               bt_lockpage (BtLockWrite, set->latch);
+               bt_freepage (bt, set);
+               break;
+         }
+
+         bt_unpinlatch (set->latch);
          tail = leaf->next;
          free (leaf);
        } while( leaf = tail );
@@ -2676,6 +2755,69 @@ int type;
   return 0;
 }
 
+//     set cursor to highest slot on highest page
+
+uint bt_lastkey (BtDb *bt)
+{
+uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
+BtPageSet set[1];
+uint slot;
+
+       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return 0;
+
+    bt_lockpage(BtLockRead, set->latch);
+
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       slot = set->page->cnt;
+
+    bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       return slot;
+}
+
+//     return previous slot on cursor page
+
+uint bt_prevkey (BtDb *bt, uint slot)
+{
+uid ourright, next, us = bt->cursor_page;
+BtPageSet set[1];
+
+       if( --slot )
+               return slot;
+
+       ourright = bt_getid(bt->cursor->right);
+
+goleft:
+       if( !(next = bt_getid(bt->cursor->left)) )
+               return 0;
+
+findourself:
+       bt->cursor_page = next;
+
+       if( set->latch = bt_pinlatch (bt, next, 1) )
+               set->page = bt_mappage (bt, set->latch);
+       else
+               return 0;
+
+    bt_lockpage(BtLockRead, set->latch);
+       memcpy (bt->cursor, set->page, bt->mgr->page_size);
+       bt_unlockpage(BtLockRead, set->latch);
+       bt_unpinlatch (set->latch);
+       
+       next = bt_getid (bt->cursor->right);
+
+       if( next != us )
+         if( next == ourright )
+               goto goleft;
+         else
+               goto findourself;
+
+       return bt->cursor->cnt;
+}
+
 //  return next slot on cursor page
 //  or slide cursor right into next page
 
@@ -2976,14 +3118,14 @@ FILE *in;
                          slotptr(page,cnt)->type = Unique;
                          len = 0;
 
-                         if( cnt < 25 )
+                         if( cnt < args->num )
                                continue;
 
                          page->cnt = cnt;
                          page->act = cnt;
                          page->min = nxt;
 
-                         if( bt_atomicmods (bt, page) )
+                         if( bt_atomictxn (bt, page) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          nxt = 65536;
                          cnt = 0;