]> pd.if.org Git - btree/commitdiff
add txn level zero page loader
authorunknown <karl@E04.petzent.com>
Fri, 26 Sep 2014 23:05:39 +0000 (16:05 -0700)
committerunknown <karl@E04.petzent.com>
Fri, 26 Sep 2014 23:05:39 +0000 (16:05 -0700)
threadskv8.c

index 44793a09d99365c97979c5bd79d44905ff555283..574bbd9b275f4a096f1de23e595af7df1b36602e 100644 (file)
@@ -155,11 +155,6 @@ typedef struct {
        uint prev;                              // prev entry in hash table chain
        volatile ushort pin;    // number of outstanding threads
        ushort dirty:1;                 // page in cache is dirty
-#ifdef unix
-       pthread_t atomictid;    // pid holding atomic lock
-#else
-       uint atomictid;                 // pid holding atomic lock
-#endif
 } BtLatchSet;
 
 //     Define the length of the page record numbers
@@ -652,7 +647,6 @@ BtLatchSet *latch = bt->mgr->latchsets + slot;
                bt->mgr->latchsets[latch->next].prev = slot;
 
        bt->mgr->hashtable[hashidx].slot = slot;
-       memset (&latch->atomictid, 0, sizeof(latch->atomictid));
        latch->page_no = page_no;
        latch->entry = slot;
        latch->split = 0;
@@ -1139,10 +1133,6 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch)
        case BtLockAtomic:
                WriteLock (latch->atomic);
                break;
-       case BtLockAtomic | BtLockRead:
-               WriteLock (latch->atomic);
-               ReadLock (latch->readwr);
-               break;
        }
 }
 
@@ -1167,12 +1157,6 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch)
                WriteRelease (latch->parent);
                break;
        case BtLockAtomic:
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
-               WriteRelease (latch->atomic);
-               break;
-       case BtLockAtomic | BtLockRead:
-               ReadRelease (latch->readwr);
-               memset (&latch->atomictid, 0, sizeof(latch->atomictid));
                WriteRelease (latch->atomic);
                break;
        }
@@ -1293,20 +1277,10 @@ uint mode, prevmode;
          prevpage = 0;
        }
 
-       //      skip Atomic lock on leaf page we need to slide over
-
-       if( !drill )
-        if( mode & BtLockAtomic )
-         if( pthread_equal (set->latch->atomictid, pthread_self()) )
-               mode &= ~BtLockAtomic;
-
        // obtain mode lock using lock chaining through AccessLock
 
        bt_lockpage(mode, set->latch);
 
-       if( mode & BtLockAtomic )
-               set->latch->atomictid = pthread_self();
-
        if( set->page->free )
                return bt->err = BTERR_struct, 0;
 
@@ -1342,11 +1316,13 @@ uint mode, prevmode;
          if( drill == lvl )
                return slot;
 
+         // find next non-dead slot -- the fence key if nothing else
+
          while( slotptr(set->page, slot)->dead )
                if( slot++ < set->page->cnt )
-                       continue;
+                 continue;
                else
-                       goto slideright;
+                 return bt->err = BTERR_struct, 0;
 
          page_no = bt_getid(valptr(set->page, slot)->value);
          drill--;
@@ -2263,6 +2239,81 @@ typedef struct {
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
+//  find and load leaf page for given key
+//     leave page Atomic locked and Read locked.
+
+int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
+{
+BtLatchSet *prevlatch;
+uid page_no;
+uint slot;
+
+  //  find level zero page
+
+  if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
+       return 0;
+
+  // find next non-dead entry on this page
+  //   it will be the fence key if nothing else
+
+  while( slotptr(set->page, slot)->dead )
+       if( slot++ < set->page->cnt )
+         continue;
+       else
+         return bt->err = BTERR_struct, 0;
+
+  page_no = bt_getid(valptr(set->page, slot)->value);
+  prevlatch = set->latch;
+
+  while( page_no ) {
+       if( set->latch = bt_pinlatch (bt, page_no, 1) )
+         set->page = bt_mappage (bt, set->latch);
+       else
+         return 0;
+
+       if( set->page->free || set->page->lvl )
+               return bt->err = BTERR_struct, 0;
+
+       // obtain read lock using lock chaining with Access mode
+       //      release & unpin parent/left sibling page
+
+       bt_lockpage(BtLockAccess, set->latch);
+
+       bt_unlockpage(BtLockRead, prevlatch);
+       bt_unpinlatch (prevlatch);
+
+       bt_lockpage(BtLockRead, set->latch);
+       bt_unlockpage(BtLockAccess, set->latch);
+
+       //  find key on page at this level
+       //  and descend to requested level
+
+       if( !set->page->kill )
+        if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
+         bt_unlockpage(BtLockRead, set->latch);
+         bt_lockpage(BtLockAtomic, set->latch);
+         bt_lockpage(BtLockAccess, set->latch);
+         bt_lockpage(BtLockRead, set->latch);
+         bt_unlockpage(BtLockAccess, set->latch);
+
+         if( slot = bt_findslot (set->page, key, len) )
+               return slot;
+
+         bt_unlockpage(BtLockAtomic, set->latch);
+         }
+
+       //  slide right into next page
+
+       page_no = bt_getid(set->page->right);
+       prevlatch = set->latch;
+  }
+
+  // return error on end of right chain
+
+  bt->err = BTERR_struct;
+  return 0;    // return error
+}
+
 //     determine actual page where key is located
 //  return slot number
 
@@ -2415,7 +2466,7 @@ int type;
                bt_unlockpage(BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_loadpage (bt, set, key->key, key->len, 0, BtLockAtomic | BtLockRead) )
+         if( slot = bt_atomicload(bt, set, key->key, key->len) )
                set->latch->split = 0;
          else
                return -1;