]> pd.if.org Git - btree/commitdiff
Add re-entrant WOnly queued lock, remove bt_atomicload
authorunknown <karl@E04.petzent.com>
Tue, 30 Sep 2014 05:52:00 +0000 (22:52 -0700)
committerunknown <karl@E04.petzent.com>
Tue, 30 Sep 2014 05:52:00 +0000 (22:52 -0700)
threadskv8.c

index 6b377106ad1666078fbfa663a520df47ea0b9dc4..3f669b83d933d83d640cd3fe7079aacae2eb537f 100644 (file)
@@ -117,6 +117,8 @@ typedef struct {
 typedef struct {
        ushort ticket[1];
        ushort serving[1];
 typedef struct {
        ushort ticket[1];
        ushort serving[1];
+       ushort tid;
+       ushort dup;
 } WOLock;
 
 #define PHID 0x1
 } WOLock;
 
 #define PHID 0x1
@@ -411,9 +413,14 @@ uid bt_newdup (BtDb *bt)
 
 //     Write-Only Queue Lock
 
 
 //     Write-Only Queue Lock
 
-void WriteOLock (WOLock *lock)
+void WriteOLock (WOLock *lock, ushort tid)
 {
 ushort tix;
 {
 ushort tix;
+
+       if( lock->tid == tid ) {
+               lock->dup++;
+               return;
+       }
 #ifdef unix
        tix = __sync_fetch_and_add (lock->ticket, 1);
 #else
 #ifdef unix
        tix = __sync_fetch_and_add (lock->ticket, 1);
 #else
@@ -427,10 +434,17 @@ ushort tix;
 #else
                SwitchToThread ();
 #endif
 #else
                SwitchToThread ();
 #endif
+       lock->tid = tid;
 }
 
 void WriteORelease (WOLock *lock)
 {
 }
 
 void WriteORelease (WOLock *lock)
 {
+       if( lock->dup ) {
+               lock->dup--;
+               return;
+       }
+
+       lock->tid = 0;
        lock->serving[0]++;
 }
 
        lock->serving[0]++;
 }
 
@@ -1168,10 +1182,14 @@ void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
                WriteLock (latch->access);
                break;
        case BtLockParent:
                WriteLock (latch->access);
                break;
        case BtLockParent:
-               WriteOLock (latch->parent);
+               WriteOLock (latch->parent, bt->thread_no);
                break;
        case BtLockAtomic:
                break;
        case BtLockAtomic:
-               WriteOLock (latch->atomic);
+               WriteOLock (latch->atomic, bt->thread_no);
+               break;
+       case BtLockAtomic | BtLockRead:
+               WriteOLock (latch->atomic, bt->thread_no);
+               ReadLock (latch->readwr);
                break;
        }
 }
                break;
        }
 }
@@ -1199,6 +1217,10 @@ void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
        case BtLockAtomic:
                WriteORelease (latch->atomic);
                break;
        case BtLockAtomic:
                WriteORelease (latch->atomic);
                break;
+       case BtLockAtomic | BtLockRead:
+               WriteORelease (latch->atomic);
+               ReadRelease (latch->readwr);
+               break;
        }
 }
 
        }
 }
 
@@ -1569,7 +1591,6 @@ BtKey *ptr;
 
        bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
 
        bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
-       bt->found = 1;
        return 0;
 }
 
        return 0;
 }
 
@@ -1620,7 +1641,7 @@ BtVal *val;
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
-               return bt->found = found, 0;
+               return 0;
 
        //      do we need to collapse root?
 
 
        //      do we need to collapse root?
 
@@ -1628,7 +1649,7 @@ BtVal *val;
          if( bt_collapseroot (bt, set) )
                return bt->err;
          else
          if( bt_collapseroot (bt, set) )
                return bt->err;
          else
-               return bt->found = found, 0;
+               return 0;
 
        //      delete empty page
 
 
        //      delete empty page
 
@@ -1638,7 +1659,7 @@ BtVal *val;
        set->latch->dirty = 1;
        bt_unlockpage(bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
        set->latch->dirty = 1;
        bt_unlockpage(bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
-       return bt->found = found, 0;
+       return 0;
 }
 
 BtKey *bt_foundkey (BtDb *bt)
 }
 
 BtKey *bt_foundkey (BtDb *bt)
@@ -2273,79 +2294,6 @@ typedef struct {
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
-//  find and load leaf page for given key
-//     leave page Atomic locked and Read locked.
-
-int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
-{
-BtLatchSet *prevlatch;
-uid page_no;
-uint slot;
-
-  //  find level one slot
-
-  if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
-       return 0;
-
-  // find next non-dead entry on this page
-  //   it will be the fence key if nothing else
-
-  while( slotptr(set->page, slot)->dead )
-       if( slot++ < set->page->cnt )
-         continue;
-       else
-         return bt->err = BTERR_struct, 0;
-
-  page_no = bt_getid(valptr(set->page, slot)->value);
-  prevlatch = set->latch;
-
-  while( page_no ) {
-       if( set->latch = bt_pinlatch (bt, page_no, 1) )
-         set->page = bt_mappage (bt, set->latch);
-       else
-         return 0;
-
-       // obtain read lock using lock chaining with Access mode
-       //      release & unpin parent/left sibling page
-
-       bt_lockpage(bt, BtLockAccess, set->latch);
-
-       bt_unlockpage(bt, BtLockRead, prevlatch);
-       bt_unpinlatch (prevlatch);
-
-       bt_lockpage(bt, BtLockRead, set->latch);
-
-       //  find key on page at this level
-       //  and descend to requested level
-
-       if( !set->page->kill )
-        if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
-         bt_unlockpage(bt, BtLockRead, set->latch);
-         bt_lockpage(bt, BtLockAtomic, set->latch);
-         bt_lockpage(bt, BtLockRead, set->latch);
-
-         if( !set->page->kill )
-          if( slot = bt_findslot (set->page, key, len) ) {
-               bt_unlockpage(bt, BtLockAccess, set->latch);
-               return slot;
-          }
-
-         bt_unlockpage(bt, BtLockAtomic, set->latch);
-         }
-
-       //  slide right into next page
-
-       bt_unlockpage(bt, BtLockAccess, set->latch);
-       page_no = bt_getid(set->page->right);
-       prevlatch = set->latch;
-  }
-
-  // return error on end of right chain
-
-  bt->err = BTERR_struct;
-  return 0;    // return error
-}
-
 //     determine actual page where key is located
 //  return slot number
 
 //     determine actual page where key is located
 //  return slot number
 
@@ -2589,11 +2537,11 @@ int type;
        if( samepage = src > 1 )
          if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
        if( samepage = src > 1 )
          if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
-         else // release read on previous page
+         else
                bt_unlockpage(bt, BtLockRead, set->latch); 
 
        if( !slot )
                bt_unlockpage(bt, BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_atomicload(bt, set, key->key, key->len) )
+         if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
                set->latch->split = 0;
          else
                return -1;
                set->latch->split = 0;
          else
                return -1;
@@ -2642,7 +2590,7 @@ int type;
 
   //  unlock last loadpage lock
 
 
   //  unlock last loadpage lock
 
-  if( source->cnt > 1 )
+  if( source->cnt )
        bt_unlockpage(bt, BtLockRead, set->latch);
 
   //  obtain write lock for each master page
        bt_unlockpage(bt, BtLockRead, set->latch);
 
   //  obtain write lock for each master page