]> pd.if.org Git - btree/commitdiff
Fix a few bugs in atomic modification
authorunknown <karl@E04.petzent.com>
Wed, 24 Sep 2014 15:16:37 +0000 (08:16 -0700)
committerunknown <karl@E04.petzent.com>
Wed, 24 Sep 2014 15:16:37 +0000 (08:16 -0700)
threadskv8.c

index d2cb778cb14ceed4064b8f3f17d79101d4fc9fab..3bae89e89b861f8d99806ef2937a5fada4805f80 100644 (file)
@@ -652,6 +652,7 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
        memset (&latch->atomictid, 0, sizeof(latch->atomictid));
        latch->page_no = page_no;
        latch->entry = slot;
+       latch->split = 0;
        latch->prev = 0;
        latch->pin = 1;
 
@@ -1109,62 +1110,62 @@ int ans;
 
 // place write, read, or parent lock on requested page_no.
 
-void bt_lockpage(BtLock mode, BtLatchSet *set)
+void bt_lockpage(BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
-               ReadLock (set->readwr);
+               ReadLock (latch->readwr);
                break;
        case BtLockWrite:
-               WriteLock (set->readwr);
+               WriteLock (latch->readwr);
                break;
        case BtLockAccess:
-               ReadLock (set->access);
+               ReadLock (latch->access);
                break;
        case BtLockDelete:
-               WriteLock (set->access);
+               WriteLock (latch->access);
                break;
        case BtLockParent:
-               WriteLock (set->parent);
+               WriteLock (latch->parent);
                break;
        case BtLockAtomic:
-               WriteLock (set->atomic);
+               WriteLock (latch->atomic);
                break;
        case BtLockAtomic | BtLockRead:
-               WriteLock (set->atomic);
-               ReadLock (set->readwr);
+               WriteLock (latch->atomic);
+               ReadLock (latch->readwr);
                break;
        }
 }
 
 // remove write, read, or parent lock on requested page
 
-void bt_unlockpage(BtLock mode, BtLatchSet *set)
+void bt_unlockpage(BtLock mode, BtLatchSet *latch)
 {
        switch( mode ) {
        case BtLockRead:
-               ReadRelease (set->readwr);
+               ReadRelease (latch->readwr);
                break;
        case BtLockWrite:
-               WriteRelease (set->readwr);
+               WriteRelease (latch->readwr);
                break;
        case BtLockAccess:
-               ReadRelease (set->access);
+               ReadRelease (latch->access);
                break;
        case BtLockDelete:
-               WriteRelease (set->access);
+               WriteRelease (latch->access);
                break;
        case BtLockParent:
-               WriteRelease (set->parent);
+               WriteRelease (latch->parent);
                break;
        case BtLockAtomic:
-               memset (&set->atomictid, 0, sizeof(set->atomictid));
-               WriteRelease (set->atomic);
+               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
+               WriteRelease (latch->atomic);
                break;
        case BtLockAtomic | BtLockRead:
-               ReadRelease (set->readwr);
-               memset (&set->atomictid, 0, sizeof(set->atomictid));
-               WriteRelease (set->atomic);
+               ReadRelease (latch->readwr);
+               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
+               WriteRelease (latch->atomic);
                break;
        }
 }
@@ -1219,14 +1220,14 @@ int blk;
 
 //  find slot in page for given key at a given level
 
-int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
+int bt_findslot (BtPage page, unsigned char *key, uint len)
 {
-uint diff, higher = set->page->cnt, low = 1, slot;
+uint diff, higher = page->cnt, low = 1, slot;
 uint good = 0;
 
        //        make stopper key an infinite fence value
 
-       if( bt_getid (set->page->right) )
+       if( bt_getid (page->right) )
                higher++;
        else
                good++;
@@ -1239,7 +1240,7 @@ uint good = 0;
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
-               if( keycmp (keyptr(set->page, slot), key, len) < 0 )
+               if( keycmp (keyptr(page, slot), key, len) < 0 )
                        low = slot + 1;
                else
                        higher = slot, good++;
@@ -1329,7 +1330,7 @@ uint mode, prevmode;
        if( set->page->kill )
          goto slideright;
 
-       if( slot = bt_findslot (set, key, len) ) {
+       if( slot = bt_findslot (set->page, key, len) ) {
          if( drill == lvl )
                return slot;
 
@@ -2263,7 +2264,7 @@ uint slot = locks[src].slot;
 uint entry;
 
        if( src > 1 && locks[src].reuse )
-         entry = locks[src-1].entry, locks[src].entry = entry, slot = 0;
+         entry = locks[src-1].entry, slot = 0;
        else
          entry = locks[src].entry;
 
@@ -2281,9 +2282,11 @@ uint entry;
                set->latch = bt->mgr->latchsets + entry;
                set->page = bt_mappage (bt, set->latch);
 
-               if( slot = bt_findslot(set, key->key, key->len) )
-                       return slot;
-
+               if( slot = bt_findslot(set->page, key->key, key->len) ) {
+                 if( locks[src].reuse )
+                       locks[src].entry = entry;
+                 return slot;
+               }
        } while( entry = set->latch->split );
 
        bt->err = BTERR_atomic;
@@ -2296,15 +2299,20 @@ BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
 BtLatchSet *latch;
 BtPageSet set[1];
-uint slot, entry;
+uint entry;
+
+  while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
+       if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
+         return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
 
-  while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
-       if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
-         return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
        if( entry = bt_splitpage (bt, set) )
          latch = bt->mgr->latchsets + entry;
        else
          return bt->err;
+
+       //      splice right page into split chain
+       //      and WriteLock it.
+
        latch->split = set->latch->split;
        set->latch->split = entry;
        bt_lockpage(BtLockWrite, latch);
@@ -2413,22 +2421,22 @@ int type;
 
        if( samepage = src > 1 )
          if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) > 0 )
-               slot = bt_findslot(set, key->key, key->len);
+               slot = bt_findslot(set->page, key->key, key->len);
          else // release read on previous page
                bt_unlockpage(BtLockRead, set->latch); 
 
-       if( !slot ) {
-         slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead);
-         set->latch->split = 0;
-       }
-
        if( !slot )
-         return -1;
+         if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+               set->latch->split = 0;
+         else
+               return -1;
+
+       if( slotptr(set->page, slot)->type == Librarian )
+         ptr = keyptr(set->page, ++slot);
+       else
+         ptr = keyptr(set->page, slot);
 
        if( !samepage ) {
-         for(idx = 0; idx++ < source->cnt; )
-               if( locks[idx].entry == set->latch->entry )
-                 abort();
          locks[src].entry = set->latch->entry;
          locks[src].slot = slot;
          locks[src].reuse = 0;
@@ -2438,11 +2446,6 @@ int type;
          locks[src].reuse = 1;
        }
 
-       if( slotptr(set->page, slot)->type == Librarian )
-         ptr = keyptr(set->page, ++slot);
-       else
-         ptr = keyptr(set->page, slot);
-
        switch( slotptr(source, src)->type ) {
        case Duplicate:
        case Unique:
@@ -2491,6 +2494,7 @@ int type;
 
          return result;
        }
+
   }
 
   //  unlock last loadpage lock
@@ -2512,7 +2516,8 @@ int type;
                return -1;
          else
                continue;
-         else if( bt_atomicinsert (bt, source, locks, src) )
+       else
+         if( bt_atomicinsert (bt, source, locks, src) )
                return -1;
          else
                continue;
@@ -2528,8 +2533,7 @@ int type;
        prev->latch = bt->mgr->latchsets + locks[src].entry;
        prev->page = bt_mappage (bt, prev->latch);
 
-       //  pick-up all splits from first entry
-       //  save page that points to this entry
+       //  pick-up all splits from original page
 
        split = next = prev->latch->split;
 
@@ -2540,6 +2544,7 @@ int type;
          set->latch = bt->mgr->latchsets + entry;
          set->page = bt_mappage (bt, set->latch);
          next = set->latch->split;
+         set->latch->split = 0;
 
          // delete empty previous page
 
@@ -2556,8 +2561,7 @@ int type;
                continue;
          }
 
-         // prev page is not emptied
-
+         // prev page is not emptied 
          locks[src].emptied = 0;
 
          //  schedule previous fence key update
@@ -2575,7 +2579,6 @@ int type;
          else
                head = leaf;
 
-         latch = prev->latch;
          tail = leaf;
 
          // remove empty block from the split chain
@@ -2584,13 +2587,16 @@ int type;
                memcpy (prev->page->right, set->page->right, BtId);
                bt_lockpage (BtLockDelete, set->latch);
                bt_freepage (bt, set);
-         } else
-               *prev = *set;
+               continue;
+         }
 
-         bt_lockpage(BtLockParent, latch);
-         bt_unlockpage(BtLockWrite, latch);
+         bt_lockpage(BtLockParent, prev->latch);
+         bt_unlockpage(BtLockWrite, prev->latch);
+         *prev = *set;
        }
 
+       //  was entire chain emptied?
+
        if( !prev->page->act )
                continue;
 
@@ -2627,21 +2633,24 @@ int type;
        if( locks[src].reuse )
          continue;
 
-       //  delete page emptied by our atomic action
-
        set->latch = bt->mgr->latchsets + locks[src].entry;
        set->page = bt_mappage (bt, set->latch);
 
-       if( locks[src].emptied ) {
-         bt_unlockpage (BtLockAtomic, set->latch);
-         if( bt_deletepage (bt, set) )
-               return bt->err;
-         continue;
-       }
+       //  clear original page split field
 
+       split = set->latch->split;
+       set->latch->split = 0;
        bt_unlockpage (BtLockAtomic, set->latch);
 
-       if( !set->latch->split )
+       //  delete page emptied by our atomic action
+
+       if( locks[src].emptied )
+         if( bt_deletepage (bt, set) )
+               return bt->err;
+         else
+               continue;
+
+       if( !split )
                bt_unpinlatch (set->latch);
   }