]> pd.if.org Git - btree/blobdiff - threadskv8.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv8.c
index 44411881f67455309eede48bb81ef3ae46e81d77..9d17a4a6a99cccb611911c7e6fbb91b36d973e36 100644 (file)
@@ -112,13 +112,6 @@ typedef struct {
        ushort serving[1];
 } RWLock;
 
-//     write only queue lock
-
-typedef struct {
-       ushort ticket[1];
-       ushort serving[1];
-} WOLock;
-
 #define PHID 0x1
 #define PRES 0x2
 #define MASK 0x3
@@ -141,6 +134,14 @@ volatile typedef struct {
 #define BOTH 3
 #define SHARE 4
 
+//     write only reentrant lock
+
+typedef struct {
+       BtSpinLatch xcl[1];
+       volatile ushort tid[1];
+       volatile ushort dup[1];
+} WOLock;
+
 //  hash table entries
 
 typedef struct {
@@ -409,29 +410,39 @@ uid bt_newdup (BtDb *bt)
 #endif
 }
 
-//     Write-Only Queue Lock
+void bt_spinreleasewrite(BtSpinLatch *latch);
+void bt_spinwritelock(BtSpinLatch *latch);
 
-void WriteOLock (WOLock *lock)
+//     Write-Only Reentrant Lock
+
+void WriteOLock (WOLock *lock, ushort tid)
 {
-ushort tix;
-#ifdef unix
-       tix = __sync_fetch_and_add (lock->ticket, 1);
-#else
-       tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
-#endif
-       // wait for our ticket to come up
+  while( 1 ) {
+       bt_spinwritelock(lock->xcl);
 
-       while( tix != lock->serving[0] )
-#ifdef unix
-               sched_yield();
-#else
-               SwitchToThread ();
-#endif
+       if( *lock->tid == tid ) {
+               *lock->dup += 1;
+               bt_spinreleasewrite(lock->xcl);
+               return;
+       }
+       if( !*lock->tid ) {
+               *lock->tid = tid;
+               bt_spinreleasewrite(lock->xcl);
+               return;
+       }
+       bt_spinreleasewrite(lock->xcl);
+       sched_yield();
+  }
 }
 
 void WriteORelease (WOLock *lock)
 {
-       lock->serving[0]++;
+       if( *lock->dup ) {
+               *lock->dup -= 1;
+               return;
+       }
+
+       *lock->tid = 0;
 }
 
 //     Phase-Fair reader/writer lock implementation
@@ -1168,10 +1179,14 @@ void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
                WriteLock (latch->access);
                break;
        case BtLockParent:
-               WriteOLock (latch->parent);
+               WriteOLock (latch->parent, bt->thread_no);
                break;
        case BtLockAtomic:
-               WriteOLock (latch->atomic);
+               WriteOLock (latch->atomic, bt->thread_no);
+               break;
+       case BtLockAtomic | BtLockRead:
+               WriteOLock (latch->atomic, bt->thread_no);
+               ReadLock (latch->readwr);
                break;
        }
 }
@@ -1199,6 +1214,10 @@ void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
        case BtLockAtomic:
                WriteORelease (latch->atomic);
                break;
+       case BtLockAtomic | BtLockRead:
+               WriteORelease (latch->atomic);
+               ReadRelease (latch->readwr);
+               break;
        }
 }
 
@@ -1569,7 +1588,6 @@ BtKey *ptr;
 
        bt_unlockpage (bt, BtLockParent, set->latch);
        bt_unpinlatch (set->latch);
-       bt->found = 1;
        return 0;
 }
 
@@ -1620,7 +1638,7 @@ BtVal *val;
          if( bt_fixfence (bt, set, lvl) )
                return bt->err;
          else
-               return bt->found = found, 0;
+               return 0;
 
        //      do we need to collapse root?
 
@@ -1628,7 +1646,7 @@ BtVal *val;
          if( bt_collapseroot (bt, set) )
                return bt->err;
          else
-               return bt->found = found, 0;
+               return 0;
 
        //      delete empty page
 
@@ -1638,7 +1656,7 @@ BtVal *val;
        set->latch->dirty = 1;
        bt_unlockpage(bt, BtLockWrite, set->latch);
        bt_unpinlatch (set->latch);
-       return bt->found = found, 0;
+       return 0;
 }
 
 BtKey *bt_foundkey (BtDb *bt)
@@ -1777,7 +1795,9 @@ BtVal *val;
        while( cnt++ < max ) {
                if( cnt == slot )
                        newslot = idx + 2;
-               if( cnt < max && slotptr(bt->frame,cnt)->dead )
+
+               if( cnt < max || bt->frame->lvl )
+                 if( slotptr(bt->frame,cnt)->dead )
                        continue;
 
                // copy the value across
@@ -1794,11 +1814,9 @@ BtVal *val;
 
                // make a librarian slot
 
-               if( idx ) {
-                       slotptr(page, ++idx)->off = nxt;
-                       slotptr(page, idx)->type = Librarian;
-                       slotptr(page, idx)->dead = 1;
-               }
+               slotptr(page, ++idx)->off = nxt;
+               slotptr(page, idx)->type = Librarian;
+               slotptr(page, idx)->dead = 1;
 
                // set up the slot
 
@@ -1918,8 +1936,10 @@ uint prev;
        idx = 0;
 
        while( cnt++ < max ) {
-               if( slotptr(set->page, cnt)->dead && cnt < max )
+               if( cnt < max || set->page->lvl )
+                 if( slotptr(set->page, cnt)->dead )
                        continue;
+
                src = valptr(set->page, cnt);
                nxt -= src->len + sizeof(BtVal);
                memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
@@ -1931,11 +1951,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(bt->frame, ++idx)->off = nxt;
-                       slotptr(bt->frame, idx)->type = Librarian;
-                       slotptr(bt->frame, idx)->dead = 1;
-               }
+               slotptr(bt->frame, ++idx)->off = nxt;
+               slotptr(bt->frame, idx)->type = Librarian;
+               slotptr(bt->frame, idx)->dead = 1;
 
                //  add actual slot
 
@@ -1990,11 +2008,9 @@ uint prev;
 
                //      add librarian slot
 
-               if( idx ) {
-                       slotptr(set->page, ++idx)->off = nxt;
-                       slotptr(set->page, idx)->type = Librarian;
-                       slotptr(set->page, idx)->dead = 1;
-               }
+               slotptr(set->page, ++idx)->off = nxt;
+               slotptr(set->page, idx)->type = Librarian;
+               slotptr(set->page, idx)->dead = 1;
 
                //      add actual slot
 
@@ -2264,7 +2280,7 @@ typedef struct {
        uint entry;                     // latch table entry number
        uint slot:31;           // page slot number
        uint reuse:1;           // reused previous page
-} AtomicMod;
+} AtomicTxn;
 
 typedef struct {
        uid page_no;            // page number for split leaf
@@ -2275,83 +2291,10 @@ typedef struct {
        unsigned char leafkey[BT_keyarray];
 } AtomicKey;
 
-//  find and load leaf page for given key
-//     leave page Atomic locked and Read locked.
-
-int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
-{
-BtLatchSet *prevlatch;
-uid page_no;
-uint slot;
-
-  //  find level one slot
-
-  if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
-       return 0;
-
-  // find next non-dead entry on this page
-  //   it will be the fence key if nothing else
-
-  while( slotptr(set->page, slot)->dead )
-       if( slot++ < set->page->cnt )
-         continue;
-       else
-         return bt->err = BTERR_struct, 0;
-
-  page_no = bt_getid(valptr(set->page, slot)->value);
-  prevlatch = set->latch;
-
-  while( page_no ) {
-       if( set->latch = bt_pinlatch (bt, page_no, 1) )
-         set->page = bt_mappage (bt, set->latch);
-       else
-         return 0;
-
-       // obtain read lock using lock chaining with Access mode
-       //      release & unpin parent/left sibling page
-
-       bt_lockpage(bt, BtLockAccess, set->latch);
-
-       bt_unlockpage(bt, BtLockRead, prevlatch);
-       bt_unpinlatch (prevlatch);
-
-       bt_lockpage(bt, BtLockRead, set->latch);
-
-       //  find key on page at this level
-       //  and descend to requested level
-
-       if( !set->page->kill )
-        if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
-         bt_unlockpage(bt, BtLockRead, set->latch);
-         bt_lockpage(bt, BtLockAtomic, set->latch);
-         bt_lockpage(bt, BtLockRead, set->latch);
-
-         if( !set->page->kill )
-          if( slot = bt_findslot (set->page, key, len) ) {
-               bt_unlockpage(bt, BtLockAccess, set->latch);
-               return slot;
-          }
-
-         bt_unlockpage(bt, BtLockAtomic, set->latch);
-         }
-
-       //  slide right into next page
-
-       bt_unlockpage(bt, BtLockAccess, set->latch);
-       page_no = bt_getid(set->page->right);
-       prevlatch = set->latch;
-  }
-
-  // return error on end of right chain
-
-  bt->err = BTERR_struct;
-  return 0;    // return error
-}
-
 //     determine actual page where key is located
 //  return slot number
 
-uint bt_atomicpage (BtDb *bt, BtPage source, AtomicMod *locks, uint src, BtPageSet *set)
+uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
 {
 BtKey *key = keyptr(source,src);
 uint slot = locks[src].slot;
@@ -2368,15 +2311,18 @@ uint entry;
                return slot;
        }
 
-       //      is locks->reuse set?
-       //      if so, find where our key
-       //      is located on previous page or split pages
+       //      is locks->reuse set? or was slot zeroed?
+       //      if so, find where our key is located 
+       //      on current page or pages split on
+       //      same page txn operations.
 
        do {
                set->latch = bt->mgr->latchsets + entry;
                set->page = bt_mappage (bt, set->latch);
 
                if( slot = bt_findslot(set->page, key->key, key->len) ) {
+                 if( slotptr(set->page, slot)->type == Librarian )
+                       slot++;
                  if( locks[src].reuse )
                        locks[src].entry = entry;
                  return slot;
@@ -2387,17 +2333,17 @@ uint entry;
        return 0;
 }
 
-BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
 BtLatchSet *latch;
 BtPageSet set[1];
-uint entry;
+uint entry, slot;
 
-  while( locks[src].slot = bt_atomicpage (bt, source, locks, src, set) ) {
-       if( locks[src].slot = bt_cleanpage(bt, set, key->len, locks[src].slot, val->len) )
-         return bt_insertslot (bt, set, locks[src].slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
+  while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
+       if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
+         return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
 
        if( entry = bt_splitpage (bt, set) )
          latch = bt->mgr->latchsets + entry;
@@ -2407,15 +2353,16 @@ uint entry;
        //      splice right page into split chain
        //      and WriteLock it.
 
+       bt_lockpage(bt, BtLockWrite, latch);
        latch->split = set->latch->split;
        set->latch->split = entry;
-       bt_lockpage(bt, BtLockWrite, latch);
+       locks[src].slot = 0;
   }
 
   return bt->err = BTERR_atomic;
 }
 
-BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicMod *locks, uint src)
+BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
 {
 BtKey *key = keyptr(source, src);
 uint idx, entry, slot;
@@ -2424,16 +2371,23 @@ BtKey *ptr;
 BtVal *val;
 
        if( slot = bt_atomicpage (bt, source, locks, src, set) )
+         ptr = keyptr(set->page, slot);
+       else
+         return bt->err = BTERR_struct;
+
+       if( !keycmp (ptr, key->key, key->len) )
+         if( !slotptr(set->page, slot)->dead )
                slotptr(set->page, slot)->dead = 1;
+         else
+               return 0;
        else
-               return bt->err = BTERR_struct;
+               return 0;
 
-       ptr = keyptr(set->page, slot);
        val = valptr(set->page, slot);
-
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
        set->latch->dirty = 1;
        set->page->act--;
+       bt->found++;
        return 0;
 }
 
@@ -2536,7 +2490,7 @@ BtPageSet set[1], prev[1];
 unsigned char value[BtId];
 BtKey *key, *ptr, *key2;
 BtLatchSet *latch;
-AtomicMod *locks;
+AtomicTxn *locks;
 int result = 0;
 BtSlot temp[1];
 BtPage page;
@@ -2544,7 +2498,7 @@ BtVal *val;
 uid right;
 int type;
 
-  locks = calloc (source->cnt + 1, sizeof(AtomicMod));
+  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
   head = NULL;
   tail = NULL;
 
@@ -2580,11 +2534,11 @@ int type;
        if( samepage = src > 1 )
          if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
                slot = bt_findslot(set->page, key->key, key->len);
-         else // release read on previous page
+         else
                bt_unlockpage(bt, BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_atomicload(bt, set, key->key, key->len) )
+         if( slot = bt_loadpage(bt, set, key->key, key->len, 0, BtLockRead | BtLockAtomic) )
                set->latch->split = 0;
          else
                return -1;
@@ -2633,7 +2587,7 @@ int type;
 
   //  unlock last loadpage lock
 
-  if( source->cnt > 1 )
+  if( source->cnt )
        bt_unlockpage(bt, BtLockRead, set->latch);
 
   //  obtain write lock for each master page
@@ -3097,7 +3051,7 @@ uint slot = 0;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->ticket != *latch->parent->serving )
+               if( *latch->parent->tid )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
@@ -3130,9 +3084,9 @@ BtKey *ptr;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->ticket != *latch->parent->serving )
+               if( *latch->parent->tid )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
-               memset ((ushort *)latch->parent, 0, sizeof(RWLock));
+               memset ((ushort *)latch->parent, 0, sizeof(WOLock));
 
                if( latch->pin ) {
                        fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
@@ -3287,7 +3241,7 @@ FILE *in;
                        }
                        else if( len < BT_maxkey )
                                key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
+               fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
                break;
 
        case 'w':