]> pd.if.org Git - btree/commitdiff
Fix broken WriteOnly reentrant locks
authorunknown <karl@E04.petzent.com>
Wed, 15 Oct 2014 05:58:24 +0000 (22:58 -0700)
committerunknown <karl@E04.petzent.com>
Wed, 15 Oct 2014 05:58:24 +0000 (22:58 -0700)
threadskv10.c
threadskv8.c

index 62ee44e3d69fee2b61f67b2cd60044f4b08761a4..6ded2dabc32dbd44d4ed7382ea3c4844d42f976f 100644 (file)
@@ -113,15 +113,15 @@ typedef enum{
 typedef struct {
   union {
        struct {
-         volatile uint xlock:1;        // one writer has exclusive lock
-         volatile uint wrt:31;         // count of other writers waiting
+         volatile ushort xlock;        // one writer has exclusive lock
+         volatile ushort wrt;          // count of other writers waiting
        } bits[1];
        uint value[1];
   };
 } BtMutexLatch;
 
 #define XCL 1
-#define WRT 2
+#define WRT 65536
 
 //     definition for phase-fair reader/writer lock implementation
 
@@ -130,8 +130,6 @@ typedef struct {
        volatile ushort rout[1];
        volatile ushort ticket[1];
        volatile ushort serving[1];
-       volatile ushort tid[1];
-       volatile ushort dup[1];
 } RWLock;
 
 //     write only lock
@@ -320,6 +318,24 @@ typedef struct {
        unsigned char key[BT_keyarray]; // last found complete key
 } BtDb;
 
+//     atomic txn structures
+
+typedef struct {
+       logseqno reqlsn;        // redo log seq no required
+       uint entry;                     // latch table entry number
+       uint slot:31;           // page slot number
+       uint reuse:1;           // reused previous page
+} AtomicTxn;
+
+typedef struct {
+       uid page_no;            // page number for split leaf
+       void *next;                     // next key to insert
+       uint entry:29;          // latch table entry number
+       uint type:2;            // 0 == insert, 1 == delete, 2 == free
+       uint nounlock:1;        // don't unlock ParentModification
+       unsigned char leafkey[BT_keyarray];
+} AtomicKey;
+
 //     Catastrophic errors
 
 typedef enum {
@@ -383,7 +399,8 @@ extern void bt_mgrclose (BtMgr *mgr);
 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
 
-//     transaction functions
+//     atomic transaction functions
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
 BTERR bt_txnpromote (BtDb *bt);
 
 //  The page is allocated from low and hi ends.
@@ -470,7 +487,7 @@ uint slept = 0;
   while( 1 ) {
        *prev->value = __sync_fetch_and_or(latch->value, XCL);
 
-       if( !prev->bits->xlock ) {                      // did we set XCL bit?
+       if( !prev->bits->xlock ) {                      // did we set XCL?
            if( slept )
                  __sync_fetch_and_sub(latch->value, WRT);
                return;
@@ -481,7 +498,7 @@ uint slept = 0;
                __sync_fetch_and_add(latch->value, WRT);
        }
 
-       sys_futex (latch->value, FUTEX_WAIT_BITSET, *prev->value, NULL, NULL, QueWr);
+       sys_futex (latch->value, FUTEX_WAIT_BITSET_PRIVATE, *prev->value, NULL, NULL, QueWr);
        slept = 1;
   }
 }
@@ -497,7 +514,7 @@ BtMutexLatch prev[1];
 
        *prev->value = __sync_fetch_and_or(latch->value, XCL);
 
-       //      take write access if exclusive bit is clear
+       //      take write access if exclusive bit was clear
 
        return !prev->bits->xlock;
 }
@@ -511,20 +528,29 @@ BtMutexLatch prev[1];
        *prev->value = __sync_fetch_and_and(latch->value, ~XCL);
 
        if( prev->bits->wrt )
-         sys_futex( latch->value, FUTEX_WAKE_BITSET, 1, NULL, NULL, QueWr );
+         sys_futex( latch->value, FUTEX_WAKE_BITSET_PRIVATE, 1, NULL, NULL, QueWr );
 }
 
 //     Write-Only Queue Lock
 
 void WriteOLock (WOLock *lock, ushort tid)
 {
+  while( 1 ) {
+       bt_mutexlock(lock->xcl);
+
        if( *lock->tid == tid ) {
                *lock->dup += 1;
+               bt_releasemutex(lock->xcl);
                return;
        }
-
-       bt_mutexlock(lock->xcl);
-       *lock->tid = tid;
+       if( !*lock->tid ) {
+               *lock->tid = tid;
+               bt_releasemutex(lock->xcl);
+               return;
+       }
+       bt_releasemutex(lock->xcl);
+       sched_yield();
+  }
 }
 
 void WriteORelease (WOLock *lock)
@@ -535,7 +561,6 @@ void WriteORelease (WOLock *lock)
        }
 
        *lock->tid = 0;
-       bt_releasemutex(lock->xcl);
 }
 
 //     Phase-Fair reader/writer lock implementation
@@ -544,10 +569,6 @@ void WriteLock (RWLock *lock, ushort tid)
 {
 ushort w, r, tix;
 
-       if( *lock->tid == tid ) {
-               *lock->dup += 1;
-               return;
-       }
 #ifdef unix
        tix = __sync_fetch_and_add (lock->ticket, 1);
 #else
@@ -574,17 +595,10 @@ ushort w, r, tix;
 #else
                SwitchToThread();
 #endif
-       *lock->tid = tid;
 }
 
 void WriteRelease (RWLock *lock)
 {
-       if( *lock->dup ) {
-               *lock->dup -= 1;
-               return;
-       }
-
-       *lock->tid = 0;
 #ifdef unix
        __sync_fetch_and_and (lock->rin, ~MASK);
 #else
@@ -600,12 +614,6 @@ int ReadTry (RWLock *lock, ushort tid)
 {
 ushort w;
 
-       //  OK if write lock already held by same thread
-
-       if( *lock->tid == tid ) {
-               *lock->dup += 1;
-               return 1;
-       }
 #ifdef unix
        w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
 #else
@@ -628,10 +636,6 @@ void ReadLock (RWLock *lock, ushort tid)
 {
 ushort w;
 
-       if( *lock->tid == tid ) {
-               *lock->dup += 1;
-               return;
-       }
 #ifdef unix
        w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
 #else
@@ -648,11 +652,6 @@ ushort w;
 
 void ReadRelease (RWLock *lock)
 {
-       if( *lock->dup ) {
-               *lock->dup -= 1;
-               return;
-       }
-
 #ifdef unix
        __sync_fetch_and_add (lock->rout, RINC);
 #else
@@ -781,7 +780,7 @@ uint last, end;
        last = mgr->redolast & ~0xfff;
        end = mgr->redoend;
 
-       if( end - last + sizeof(BtLogHdr) >= 8192 )
+       if( end - last + sizeof(BtLogHdr) >= 32768 )
          if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
          else
@@ -873,7 +872,7 @@ BtVal *val;
        last = mgr->redolast & ~0xfff;
        end = mgr->redoend;
 
-       if( end - last + sizeof(BtLogHdr) >= 8192 )
+       if( end - last + sizeof(BtLogHdr) >= 32768 )
          if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
          else
@@ -883,6 +882,25 @@ BtVal *val;
        return lsn;
 }
 
+//     sync a single btree page to disk
+
+BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
+{
+uint segment = latch->page_no >> 16;
+BtPage perm;
+
+       if( bt_writepage (mgr, page, latch->page_no) )
+               return mgr->err;
+
+       perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
+
+       if( msync (perm, mgr->page_size, MS_SYNC) < 0 )
+         fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
+
+       latch->dirty = 0;
+       return 0;
+}
+
 //     read page into buffer pool from permanent location in Btree file
 
 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
@@ -2432,6 +2450,7 @@ uint idx, librarian;
 BtSlot *node;
 BtKey *ptr;
 BtVal *val;
+int rate;
 
        //      if found slot > desired slot and previous slot
        //      is a librarian slot, use it
@@ -2460,27 +2479,52 @@ BtVal *val;
          if( slotptr(set->page, idx)->dead )
                break;
 
-       // now insert key into array before slot
+       // now insert key into array before slot,
+       //      adding as many librarian slots as
+       //      makes sense.
 
-       if( idx == set->page->cnt )
-               idx += 2, set->page->cnt += 2, librarian = 2;
-       else
-               librarian = 1;
+       if( idx == set->page->cnt ) {
+       int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
 
-       set->latch->dirty = 1;
-       set->page->act++;
+               librarian = ++idx - slot;
+               avail /= sizeof(BtSlot);
 
-       while( idx > slot + librarian - 1 )
-               *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
+               if( avail < 0 )
+                       avail = 0;
 
-       //      add librarian slot
+               if( librarian > avail )
+                       librarian = avail;
 
-       if( librarian > 1 ) {
-               node = slotptr(set->page, slot++);
-               node->off = set->page->min;
-               node->type = Librarian;
-               node->dead = 1;
+               if( librarian ) {
+                       rate = (idx - slot) / librarian;
+                       set->page->cnt += librarian;
+                       idx += librarian;
+               } else
+                       rate = 0;
+       } else
+               librarian = 0, rate = 0;
+
+       while( idx > slot ) {
+               //  transfer slot
+               *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
+               idx--;
+
+               //      add librarian slot per rate
+
+               if( librarian )
+                if( (idx - slot + 1)/2 <= librarian * rate ) {
+//               if( rate && !(idx % rate) ) {
+                       node = slotptr(set->page, idx--);
+                       node->off = node[1].off;
+                       node->type = Librarian;
+                       node->dead = 1;
+                       librarian--;
+                 }
        }
+if(librarian)
+abort();
+       set->latch->dirty = 1;
+       set->page->act++;
 
        //      fill in new slot
 
@@ -2604,23 +2648,6 @@ BtVal *val;
   return 0;
 }
 
-typedef struct {
-       logseqno reqlsn;        // redo log seq no required
-       logseqno lsn;           // redo log sequence number
-       uint entry;                     // latch table entry number
-       uint slot:31;           // page slot number
-       uint reuse:1;           // reused previous page
-} AtomicTxn;
-
-typedef struct {
-       uid page_no;            // page number for split leaf
-       void *next;                     // next key to insert
-       uint entry:29;          // latch table entry number
-       uint type:2;            // 0 == insert, 1 == delete, 2 == free
-       uint nounlock:1;        // don't unlock ParentModification
-       unsigned char leafkey[BT_keyarray];
-} AtomicKey;
-
 //     determine actual page where key is located
 //  return slot number
 
@@ -2663,7 +2690,7 @@ uint entry;
        return 0;
 }
 
-BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
 {
 BtKey *key = keyptr(source, src);
 BtVal *val = valptr(source, src);
@@ -2675,7 +2702,7 @@ uint entry, slot;
        if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
          if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0) )
                return mgr->err;
-         set->page->lsn = locks[src].lsn;
+         set->page->lsn = lsn;
          return 0;
        }
 
@@ -2699,7 +2726,7 @@ uint entry, slot;
 //     perform delete from smaller btree
 //  insert a delete slot if not found there
 
-BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no)
+BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
 {
 BtKey *key = keyptr(source, src);
 BtPageSet set[1];
@@ -2728,7 +2755,7 @@ BtVal *val;
 
        set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
        set->latch->dirty = 1;
-       set->page->lsn = locks[src].lsn;
+       set->page->lsn = lsn;
        set->page->act--;
 
        node->dead = 0;
@@ -2832,24 +2859,12 @@ BtKey *ptr;
 BTERR bt_atomictxn (BtDb *bt, BtPage source)
 {
 uint src, idx, slot, samepage, entry, que = 0;
-AtomicKey *head, *tail, *leaf;
-BtPageSet set[1], prev[1];
-unsigned char value[BtId];
 BtKey *key, *ptr, *key2;
-BtLatchSet *latch;
-AtomicTxn *locks;
 int result = 0;
 BtSlot temp[1];
 logseqno lsn;
-BtPage page;
-BtVal *val;
-uid right;
 int type;
 
-  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
-  head = NULL;
-  tail = NULL;
-
   // stable sort the list of keys into order to
   //   prevent deadlocks between threads.
 
@@ -2870,11 +2885,46 @@ int type;
   //  add entries to redo log
 
   if( bt->mgr->pagezero->redopages )
-   if( lsn = bt_txnredo (bt->mgr, source, bt->thread_no) )
-    for( src = 0; src++ < source->cnt; )
-        locks[src].lsn = lsn;
-    else
-        return bt->mgr->err;
+       lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
+  else
+       lsn = 0;
+
+  //  perform the individual actions in the transaction
+
+  if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
+       return bt->mgr->err;
+
+  // if number of active pages
+  // is greater than the buffer pool
+  // promote page into larger btree
+
+  if( bt->main )
+   while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
+       if( bt_txnpromote (bt) )
+         return bt->mgr->err;
+
+  // return success
+
+  return 0;
+}
+
+BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
+{
+uint src, idx, slot, samepage, entry, que = 0;
+AtomicKey *head, *tail, *leaf;
+BtPageSet set[1], prev[1];
+unsigned char value[BtId];
+BtLatchSet *latch;
+AtomicTxn *locks;
+BtKey *key, *ptr;
+BtPage page;
+BtVal *val;
+uid right;
+
+  locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
+
+  head = NULL;
+  tail = NULL;
 
   // Load the leaf page for each key
   // group same page references with reuse bit
@@ -2895,10 +2945,10 @@ int type;
                bt_unlockpage(BtLockRead, set->latch); 
 
        if( !slot )
-         if( slot = bt_loadpage(bt->mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, bt->thread_no) )
+         if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockRead | BtLockAtomic, thread_no) )
                set->latch->split = 0;
          else
-               return bt->mgr->err;
+               return mgr->err;
 
        if( slotptr(set->page, slot)->type == Librarian )
          ptr = keyptr(set->page, ++slot);
@@ -2906,7 +2956,7 @@ int type;
          ptr = keyptr(set->page, slot);
 
        if( !samepage ) {
-         locks[src].entry = set->latch - bt->mgr->latchsets;
+         locks[src].entry = set->latch - mgr->latchsets;
          locks[src].slot = slot;
          locks[src].reuse = 0;
        } else {
@@ -2932,8 +2982,8 @@ int type;
        if( locks[src].reuse )
          continue;
 
-       set->latch = bt->mgr->latchsets + locks[src].entry;
-       bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+       set->latch = mgr->latchsets + locks[src].entry;
+       bt_lockpage (BtLockWrite, set->latch, thread_no);
   }
 
   // insert or delete each key
@@ -2958,22 +3008,26 @@ int type;
        for( idx = src; idx < samepage; idx++ )
         switch( slotptr(source,idx)->type ) {
         case Delete:
-         if( bt_atomicdelete (bt->mgr, source, locks, idx, bt->thread_no) )
-               return bt->mgr->err;
+         if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
+               return mgr->err;
          break;
 
        case Duplicate:
        case Unique:
-         if( bt_atomicinsert (bt->mgr, source, locks, idx, bt->thread_no) )
-               return bt->mgr->err;
+         if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
+               return mgr->err;
+         break;
+
+       default:
+         bt_atomicpage (mgr, source, locks, idx, set);
          break;
        }
 
        //      after the same page operations have finished,
        //  process master page for splits or deletion.
 
-       latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
-       prev->page = bt_mappage (bt->mgr, prev->latch);
+       latch = prev->latch = mgr->latchsets + locks[src].entry;
+       prev->page = bt_mappage (mgr, prev->latch);
        samepage = src;
 
        //  pick-up all splits from master page
@@ -2982,8 +3036,8 @@ int type;
        entry = prev->latch->split;
 
        while( entry ) {
-         set->latch = bt->mgr->latchsets + entry;
-         set->page = bt_mappage (bt->mgr, set->latch);
+         set->latch = mgr->latchsets + entry;
+         set->page = bt_mappage (mgr, set->latch);
          entry = set->latch->split;
 
          // delete empty master page by undoing its split
@@ -2992,9 +3046,9 @@ int type;
 
          if( !prev->page->act ) {
                memcpy (set->page->left, prev->page->left, BtId);
-               memcpy (prev->page, set->page, bt->mgr->page_size);
-               bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
-               bt_freepage (bt->mgr, set);
+               memcpy (prev->page, set->page, mgr->page_size);
+               bt_lockpage (BtLockDelete, set->latch, thread_no);
+               bt_freepage (mgr, set);
 
                prev->latch->split = set->latch->split;
                prev->latch->dirty = 1;
@@ -3007,8 +3061,8 @@ int type;
          if( !set->page->act ) {
                memcpy (prev->page->right, set->page->right, BtId);
                prev->latch->split = set->latch->split;
-               bt_lockpage (BtLockDelete, set->latch, bt->thread_no);
-               bt_freepage (bt->mgr, set);
+               bt_lockpage (BtLockDelete, set->latch, thread_no);
+               bt_freepage (mgr, set);
                continue;
          }
 
@@ -3018,7 +3072,7 @@ int type;
          leaf = calloc (sizeof(AtomicKey), 1), que++;
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-         leaf->entry = prev->latch - bt->mgr->latchsets;
+         leaf->entry = prev->latch - mgr->latchsets;
          leaf->page_no = prev->latch->page_no;
          leaf->type = 0;
 
@@ -3032,8 +3086,10 @@ int type;
          // splice in the left link into the split page
 
          bt_putid (set->page->left, prev->latch->page_no);
-         bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
+         bt_lockpage(BtLockParent, prev->latch, thread_no);
          bt_unlockpage(BtLockWrite, prev->latch);
+         if( lsm )
+               bt_syncpage (mgr, prev->page, prev->latch);
          *prev = *set;
        }
 
@@ -3045,20 +3101,20 @@ int type;
          //  far right sibling or set rightmost page in page zero
 
          if( right = bt_getid (prev->page->right) ) {
-               if( set->latch = bt_pinlatch (bt->mgr, right, NULL, bt->thread_no) )
-                 set->page = bt_mappage (bt->mgr, set->latch);
+               if( set->latch = bt_pinlatch (mgr, right, NULL, thread_no) )
+                 set->page = bt_mappage (mgr, set->latch);
                else
-                 return bt->mgr->err;
+                 return mgr->err;
 
-           bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
+           bt_lockpage (BtLockWrite, set->latch, thread_no);
            bt_putid (set->page->left, prev->latch->page_no);
                set->latch->dirty = 1;
            bt_unlockpage (BtLockWrite, set->latch);
-               bt_unpinlatch (bt->mgr, set->latch);
+               bt_unpinlatch (mgr, set->latch);
          } else {      // prev is rightmost page
-           bt_mutexlock (bt->mgr->lock);
-               bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
-           bt_releasemutex(bt->mgr->lock);
+           bt_mutexlock (mgr->lock);
+               bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
+           bt_releasemutex(mgr->lock);
          }
 
          //  Process last page split in chain
@@ -3067,7 +3123,7 @@ int type;
          leaf = calloc (sizeof(AtomicKey), 1), que++;
 
          memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-         leaf->entry = prev->latch - bt->mgr->latchsets;
+         leaf->entry = prev->latch - mgr->latchsets;
          leaf->page_no = prev->latch->page_no;
          leaf->type = 0;
   
@@ -3078,9 +3134,12 @@ int type;
 
          tail = leaf;
 
-         bt_lockpage(BtLockParent, prev->latch, bt->thread_no);
+         bt_lockpage(BtLockParent, prev->latch, thread_no);
          bt_unlockpage(BtLockWrite, prev->latch);
 
+         if( lsm )
+               bt_syncpage (mgr, prev->page, prev->latch);
+
          //  remove atomic lock on master page
 
          bt_unlockpage(BtLockAtomic, latch);
@@ -3092,7 +3151,11 @@ int type;
        if( prev->page->act ) {
          bt_unlockpage(BtLockWrite, latch);
          bt_unlockpage(BtLockAtomic, latch);
-         bt_unpinlatch(bt->mgr, latch);
+
+         if( lsm )
+               bt_syncpage (mgr, prev->page, latch);
+
+         bt_unpinlatch(mgr, latch);
          continue;
        }
 
@@ -3104,8 +3167,8 @@ int type;
 
        ptr = keyptr(prev->page,prev->page->cnt);
 
-       if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
-               return bt->mgr->err;
+       if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
+               return mgr->err;
 
        //      perform the remainder of the delete
        //      from the FIFO queue
@@ -3113,7 +3176,7 @@ int type;
        leaf = calloc (sizeof(AtomicKey), 1), que++;
 
        memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
-       leaf->entry = prev->latch - bt->mgr->latchsets;
+       leaf->entry = prev->latch - mgr->latchsets;
        leaf->page_no = prev->latch->page_no;
        leaf->nounlock = 1;
        leaf->type = 2;
@@ -3129,34 +3192,38 @@ int type;
        //      deletion completes in next phase.
 
        bt_unlockpage(BtLockWrite, prev->latch);
+
+       if( lsm )
+         bt_syncpage (mgr, prev->page, prev->latch);
+
   }
 
   //  add & delete keys for any pages split or merged during transaction
 
   if( leaf = head )
     do {
-         set->latch = bt->mgr->latchsets + leaf->entry;
-         set->page = bt_mappage (bt->mgr, set->latch);
+         set->latch = mgr->latchsets + leaf->entry;
+         set->page = bt_mappage (mgr, set->latch);
 
          bt_putid (value, leaf->page_no);
          ptr = (BtKey *)leaf->leafkey;
 
          switch( leaf->type ) {
          case 0:       // insert key
-           if( bt_insertkey (bt->mgr, ptr->key, ptr->len, 1, value, BtId, Unique, bt->thread_no) )
-                 return bt->mgr->err;
+           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
+                 return mgr->err;
 
                break;
 
          case 1:       // delete key
-               if( bt_deletekey (bt->mgr, ptr->key, ptr->len, 1, bt->thread_no) )
-                 return bt->mgr->err;
+               if( bt_deletekey (mgr, ptr->key, ptr->len, 1, thread_no) )
+                 return mgr->err;
 
                break;
 
          case 2:       // free page
-               if( bt_atomicfree (bt->mgr, set, bt->thread_no) )
-                 return bt->mgr->err;
+               if( bt_atomicfree (mgr, set, thread_no) )
+                 return mgr->err;
 
                break;
          }
@@ -3164,22 +3231,11 @@ int type;
          if( !leaf->nounlock )
            bt_unlockpage (BtLockParent, set->latch);
 
-         bt_unpinlatch (bt->mgr, set->latch);
+         bt_unpinlatch (mgr, set->latch);
          tail = leaf->next;
          free (leaf);
        } while( leaf = tail );
 
-  // if number of active pages
-  // is greater than the buffer pool
-  // promote page into larger btree
-
-  if( bt->main )
-   while( bt->mgr->pagezero->activepages > bt->mgr->latchtotal - 10 )
-       if( bt_txnpromote (bt) )
-         return bt->mgr->err;
-
-  // return success
-
   free (locks);
   return 0;
 }
@@ -3227,14 +3283,14 @@ BtVal *val;
          continue;
        }
 
-       bt_lockpage (BtLockAccess, set->latch, bt->thread_no);
+       bt_lockpage (BtLockAtomic, set->latch, bt->thread_no);
        bt_lockpage (BtLockWrite, set->latch, bt->thread_no);
-    bt_unlockpage(BtLockAccess, set->latch);
 
        // entry interiour node or being or was killed
 
        if( set->page->lvl || set->page->free || set->page->kill ) {
          bt_releasemutex(set->latch->modify);
+      bt_unlockpage(BtLockAtomic, set->latch);
       bt_unlockpage(BtLockWrite, set->latch);
          continue;
        }
@@ -3246,28 +3302,12 @@ BtVal *val;
 
        // transfer slots in our selected page to larger btree
 if( !(set->latch->page_no % 100) )
-fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->cnt);
-
-       for( slot = 0; slot++ < set->page->cnt; ) {
-               ptr = keyptr (set->page, slot);
-               val = valptr (set->page, slot);
-               node = slotptr(set->page, slot);
-
-               switch( node->type ) {
-               case Duplicate:
-               case Unique:
-                 if( bt_insertkey (bt->main, ptr->key, ptr->len, 0, val->value, val->len, node->type, bt->thread_no) )
-                       return bt->main->err;
+fprintf(stderr, "Promote page %d, %d keys\n", set->latch->page_no, set->page->act);
 
-                 continue;
-
-               case Delete:
-                 if( bt_deletekey (bt->main, ptr->key, ptr->len, 0, bt->thread_no) )
-                       return bt->main->err;
+       if( bt_atomicexec (bt->main, set->page, 0, 1, bt->thread_no) )
+               return bt->main->err;
 
-                 continue;
-               }
-       }
+    bt_unlockpage(BtLockAtomic, set->latch);
 
        //  now delete the page
 
index 3f669b83d933d83d640cd3fe7079aacae2eb537f..9d17a4a6a99cccb611911c7e6fbb91b36d973e36 100644 (file)
@@ -112,15 +112,6 @@ typedef struct {
        ushort serving[1];
 } RWLock;
 
-//     write only queue lock
-
-typedef struct {
-       ushort ticket[1];
-       ushort serving[1];
-       ushort tid;
-       ushort dup;
-} WOLock;
-
 #define PHID 0x1
 #define PRES 0x2
 #define MASK 0x3
@@ -143,6 +134,14 @@ volatile typedef struct {
 #define BOTH 3
 #define SHARE 4
 
+//     write only reentrant lock
+
+typedef struct {
+       BtSpinLatch xcl[1];
+       volatile ushort tid[1];
+       volatile ushort dup[1];
+} WOLock;
+
 //  hash table entries
 
 typedef struct {
@@ -411,41 +410,39 @@ uid bt_newdup (BtDb *bt)
 #endif
 }
 
-//     Write-Only Queue Lock
+void bt_spinreleasewrite(BtSpinLatch *latch);
+void bt_spinwritelock(BtSpinLatch *latch);
+
+//     Write-Only Reentrant Lock
 
 void WriteOLock (WOLock *lock, ushort tid)
 {
-ushort tix;
+  while( 1 ) {
+       bt_spinwritelock(lock->xcl);
 
-       if( lock->tid == tid ) {
-               lock->dup++;
+       if( *lock->tid == tid ) {
+               *lock->dup += 1;
+               bt_spinreleasewrite(lock->xcl);
                return;
        }
-#ifdef unix
-       tix = __sync_fetch_and_add (lock->ticket, 1);
-#else
-       tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
-#endif
-       // wait for our ticket to come up
-
-       while( tix != lock->serving[0] )
-#ifdef unix
-               sched_yield();
-#else
-               SwitchToThread ();
-#endif
-       lock->tid = tid;
+       if( !*lock->tid ) {
+               *lock->tid = tid;
+               bt_spinreleasewrite(lock->xcl);
+               return;
+       }
+       bt_spinreleasewrite(lock->xcl);
+       sched_yield();
+  }
 }
 
 void WriteORelease (WOLock *lock)
 {
-       if( lock->dup ) {
-               lock->dup--;
+       if( *lock->dup ) {
+               *lock->dup -= 1;
                return;
        }
 
-       lock->tid = 0;
-       lock->serving[0]++;
+       *lock->tid = 0;
 }
 
 //     Phase-Fair reader/writer lock implementation
@@ -3054,7 +3051,7 @@ uint slot = 0;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->ticket != *latch->parent->serving )
+               if( *latch->parent->tid )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
@@ -3087,9 +3084,9 @@ BtKey *ptr;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->ticket != *latch->parent->serving )
+               if( *latch->parent->tid )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
-               memset ((ushort *)latch->parent, 0, sizeof(RWLock));
+               memset ((ushort *)latch->parent, 0, sizeof(WOLock));
 
                if( latch->pin ) {
                        fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);