]> pd.if.org Git - btree/commitdiff
Fix bug in reverse scan. New command line interface with a command per input file
authorunknown <karl@E04.petzent.com>
Sat, 27 Sep 2014 18:02:29 +0000 (11:02 -0700)
committerunknown <karl@E04.petzent.com>
Sat, 27 Sep 2014 18:02:29 +0000 (11:02 -0700)
threadskv8.c

index f2a23c7d1ecbfddbf4957e65831b386a973c9946..91ec929670d8fd4ed6a27cd4fe040f8c9c038ec5 100644 (file)
@@ -112,6 +112,13 @@ typedef struct {
        ushort serving[1];
 } RWLock;
 
+//     write only queue lock
+
+typedef struct {
+       ushort ticket[1];
+       ushort serving[1];
+} WOLock;
+
 #define PHID 0x1
 #define PRES 0x2
 #define MASK 0x3
@@ -147,8 +154,8 @@ typedef struct {
        uid page_no;                    // latch set page number
        RWLock readwr[1];               // read/write page lock
        RWLock access[1];               // Access Intent/Page delete
-       RWLock parent[1];               // Posting of fence key in parent
-       RWLock atomic[1];               // Atomic update in progress
+       WOLock parent[1];               // Posting of fence key in parent
+       WOLock atomic[1];               // Atomic update in progress
        uint split;                             // right split page atomic insert
        uint entry;                             // entry slot in latch table
        uint next;                              // next entry in hash table chain
@@ -400,6 +407,31 @@ uid bt_newdup (BtDb *bt)
 #endif
 }
 
+//     Write-Only Queue Lock
+
+void WriteOLock (WOLock *lock)
+{
+ushort tix;
+#ifdef unix
+       tix = __sync_fetch_and_add (lock->ticket, 1);
+#else
+       tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
+#endif
+       // wait for our ticket to come up
+
+       while( tix != lock->serving[0] )
+#ifdef unix
+               sched_yield();
+#else
+               SwitchToThread ();
+#endif
+}
+
+void WriteORelease (WOLock *lock)
+{
+       lock->serving[0]++;
+}
+
 //     Phase-Fair reader/writer lock implementation
 
 void WriteLock (RWLock *lock)
@@ -1129,10 +1161,10 @@ void bt_lockpage(BtLock mode, BtLatchSet *latch)
                WriteLock (latch->access);
                break;
        case BtLockParent:
-               WriteLock (latch->parent);
+               WriteOLock (latch->parent);
                break;
        case BtLockAtomic:
-               WriteLock (latch->atomic);
+               WriteOLock (latch->atomic);
                break;
        }
 }
@@ -1155,10 +1187,10 @@ void bt_unlockpage(BtLock mode, BtLatchSet *latch)
                WriteRelease (latch->access);
                break;
        case BtLockParent:
-               WriteRelease (latch->parent);
+               WriteORelease (latch->parent);
                break;
        case BtLockAtomic:
-               WriteRelease (latch->atomic);
+               WriteORelease (latch->atomic);
                break;
        }
 }
@@ -2636,11 +2668,11 @@ int type;
          *prev = *set;
        }
 
-       //  update left pointer in next right page
-       //      if we did any now non-empty splits
+       //  update left pointer in next right page from last split page
+       //      (if all splits were reversed, latch->split == 0)
 
        if( latch->split ) {
-         //  fix left pointer in master's original right sibling
+         //  fix left pointer in master's original (now split) right sibling
          //    or set rightmost page in page zero
 
          if( right = bt_getid (prev->page->right) ) {
@@ -2654,8 +2686,11 @@ int type;
                set->latch->dirty = 1;
            bt_unlockpage (BtLockWrite, set->latch);
                bt_unpinlatch (set->latch);
-         } else
+         } else {      // prev is rightmost page
+           bt_spinwritelock (bt->mgr->lock);
                bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+           bt_spinreleasewrite(bt->mgr->lock);
+         }
 
          //  Process last page split in chain
 
@@ -2676,6 +2711,9 @@ int type;
 
          bt_lockpage(BtLockParent, prev->latch);
          bt_unlockpage(BtLockWrite, prev->latch);
+
+         //  remove atomic lock on master page
+
          bt_unlockpage(BtLockAtomic, latch);
          continue;
        }
@@ -2689,10 +2727,11 @@ int type;
          continue;
        }
 
-       // any splits were reversed, and the
+       // any and all splits were reversed, and the
        // master page located in prev is empty, delete it
        // by pulling over master's right sibling.
-       // Delete empty master's fence
+
+       // Delete empty master's fence key
 
        ptr = keyptr(prev->page,prev->page->cnt);
        leaf = calloc (sizeof(AtomicKey), 1);
@@ -2713,6 +2752,10 @@ int type;
        bt_unlockpage(BtLockWrite, prev->latch);
 
        // grab master's right sibling
+       //      note that the far right page never empties because
+       //      it always contains (at least) the infinite stopper key
+       //      and that all pages that don't contain any keys have
+       //      been deleted, or are being deleted under write lock.
 
        if( set->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
                set->page = bt_mappage (bt, set->latch);
@@ -2721,11 +2764,15 @@ int type;
 
        bt_lockpage(BtLockWrite, set->latch);
 
-       //      pull contents over empty page
+       //      and pull contents over empty page
+       //      while preserving master's left link
 
        memcpy (set->page->left, prev->page->left, BtId);
        memcpy (prev->page, set->page, bt->mgr->page_size);
 
+       //      forward seekers to old right sibling
+       //      to new page location in prev
+
        bt_putid (set->page->right, prev->latch->page_no);
        set->latch->dirty = 1;
        set->page->kill = 1;
@@ -2741,7 +2788,8 @@ int type;
        leaf->entry = set->latch->entry;
        leaf->type = 2;
   
-       //  see if right sibling is not yet in the FIFO
+       //  see if right sibling key update is in the FIFO,
+       //      and ParentLock if not there.
 
        if( !bt_parentmatch (head, leaf->entry) )
          bt_lockpage(BtLockParent, set->latch);
@@ -2755,7 +2803,7 @@ int type;
 
        tail = leaf;
 
-       //  fix master's far right sibling's left pointer
+       //  fix new master's right sibling's left pointer
 
        if( right = bt_getid (set->page->right) ) {
          if( set->latch = bt_pinlatch (bt, right, 1) )
@@ -2767,6 +2815,10 @@ int type;
 
          bt_unlockpage (BtLockWrite, set->latch);
          bt_unpinlatch (set->latch);
+       } else {        // master is now the far right page
+         bt_spinwritelock (bt->mgr->lock);
+         bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
+         bt_spinreleasewrite(bt->mgr->lock);
        }
 
        bt_unlockpage(BtLockAtomic, latch);
@@ -2825,7 +2877,6 @@ uint bt_lastkey (BtDb *bt)
 {
 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
 BtPageSet set[1];
-uint slot;
 
        if( set->latch = bt_pinlatch (bt, page_no, 1) )
                set->page = bt_mappage (bt, set->latch);
@@ -2833,13 +2884,12 @@ uint slot;
                return 0;
 
     bt_lockpage(BtLockRead, set->latch);
-
        memcpy (bt->cursor, set->page, bt->mgr->page_size);
-       slot = set->page->cnt;
-
     bt_unlockpage(BtLockRead, set->latch);
        bt_unpinlatch (set->latch);
-       return slot;
+
+       bt->cursor_page = page_no;
+       return bt->cursor->cnt;
 }
 
 //     return previous slot on cursor page
@@ -2873,6 +2923,9 @@ findourself:
        
        next = bt_getid (bt->cursor->right);
 
+       if( bt->cursor->kill )
+               goto findourself;
+
        if( next != us )
          if( next == ourright )
                goto goleft;
@@ -3034,7 +3087,7 @@ uint slot = 0;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->rin & MASK )
+               if( *latch->parent->ticket != *latch->parent->serving )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
                memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
@@ -3067,7 +3120,7 @@ BtKey *ptr;
                        fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
                memset ((ushort *)latch->access, 0, sizeof(RWLock));
 
-               if( *latch->parent->rin & MASK )
+               if( *latch->parent->ticket != *latch->parent->serving )
                        fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
                memset ((ushort *)latch->parent, 0, sizeof(RWLock));
 
@@ -3136,12 +3189,12 @@ void *index_file (void *arg)
 uint __stdcall index_file (void *arg)
 #endif
 {
-int line = 0, found = 0, cnt = 0, unique;
+int line = 0, found = 0, cnt = 0, idx;
 uid next, page_no = LEAF_page; // start on first page of leaves
+int ch, len = 0, slot, type = 0;
 unsigned char key[BT_maxkey];
 unsigned char txn[65536];
 ThreadArg *args = arg;
-int ch, len = 0, slot;
 BtPageSet set[1];
 uint nxt = 65536;
 BtPage page;
@@ -3153,9 +3206,12 @@ FILE *in;
        bt = bt_open (args->mgr);
        page = (BtPage)txn;
 
-       unique = (args->type[1] | 0x20) != 'd';
+       if( args->idx < strlen (args->type) )
+               ch = args->type[args->idx];
+       else
+               ch = args->type[strlen(args->type) - 1];
 
-       switch(args->type[0] | 0x20)
+       switch(ch | 0x20)
        {
        case 'a':
                fprintf(stderr, "started latch mgr audit\n");
@@ -3163,13 +3219,37 @@ FILE *in;
                fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
                break;
 
-       case 't':
-               fprintf(stderr, "started TXN pennysort for %s\n", args->infile);
+       case 'd':
+               type = Delete;
+
+       case 'p':
+               if( !type )
+                       type = Unique;
+
+               if( args->num )
+                if( type == Delete )
+                 fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
+                else
+                 fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
+               else
+                if( type == Delete )
+                 fprintf(stderr, "started pennysort delete for %s\n", args->infile);
+                else
+                 fprintf(stderr, "started pennysort insert for %s\n", args->infile);
+
                if( in = fopen (args->infile, "rb") )
                  while( ch = getc(in), ch != EOF )
                        if( ch == '\n' )
                        {
                          line++;
+
+                         if( !args->num ) {
+                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
+                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
+                           len = 0;
+                               continue;
+                         }
+
                          nxt -= len - 10;
                          memcpy (txn + nxt, key + 10, len - 10);
                          nxt -= 1;
@@ -3179,7 +3259,7 @@ FILE *in;
                          nxt -= 1;
                          txn[nxt] = 10;
                          slotptr(page,++cnt)->off  = nxt;
-                         slotptr(page,cnt)->type = Delete;
+                         slotptr(page,cnt)->type = type;
                          len = 0;
 
                          if( cnt < args->num )
@@ -3191,7 +3271,7 @@ FILE *in;
 
                          if( bt_atomictxn (bt, page) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         nxt = 65536;
+                         nxt = sizeof(txn);
                          cnt = 0;
 
                        }
@@ -3200,23 +3280,6 @@ FILE *in;
                fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
                break;
 
-       case 'p':
-               fprintf(stderr, "started pennysort for %s\n", args->infile);
-               if( in = fopen (args->infile, "rb") )
-                 while( ch = getc(in), ch != EOF )
-                       if( ch == '\n' )
-                       {
-                         line++;
-
-                         if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         len = 0;
-                       }
-                       else if( len < BT_maxkey )
-                               key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
-               break;
-
        case 'w':
                fprintf(stderr, "started indexing for %s\n", args->infile);
                if( in = fopen (args->infile, "r") )
@@ -3225,13 +3288,7 @@ FILE *in;
                        {
                          line++;
 
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
-                         if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
+                         if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
                                fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
                          len = 0;
                        }
@@ -3240,33 +3297,6 @@ FILE *in;
                fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
                break;
 
-       case 'd':
-               fprintf(stderr, "started deleting keys for %s\n", args->infile);
-               if( in = fopen (args->infile, "rb") )
-                 while( ch = getc(in), ch != EOF )
-                       if( ch == '\n' )
-                       {
-                         line++;
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
-                         if( bt_findkey (bt, key, len, NULL, 0) < 0 )
-                               fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
-                         ptr = (BtKey*)(bt->key);
-                         found++;
-
-                         if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
-                               fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
-                         len = 0;
-                       }
-                       else if( len < BT_maxkey )
-                               key[len++] = ch;
-               fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
-               break;
-
        case 'f':
                fprintf(stderr, "started finding keys for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )
@@ -3274,12 +3304,6 @@ FILE *in;
                        if( ch == '\n' )
                        {
                          line++;
-                         if( args->num == 1 )
-                               sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
-
-                         else if( args->num )
-                               sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
-
                          if( bt_findkey (bt, key, len, NULL, 0) == 0 )
                                found++;
                          else if( bt->err )
@@ -3324,6 +3348,29 @@ FILE *in;
                fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
                break;
 
+       case 'r':
+               fprintf(stderr, "started reverse scan\n");
+               if( slot = bt_lastkey (bt) )
+                  while( slot = bt_prevkey (bt, slot) ) {
+                       if( slotptr(bt->cursor, slot)->dead )
+                         continue;
+
+                       ptr = keyptr(bt->cursor, slot);
+                       len = ptr->len;
+
+                       if( slotptr(bt->cursor, slot)->type == Duplicate )
+                               len -= BtId;
+
+                       fwrite (ptr->key, len, 1, stdout);
+                       val = valptr(bt->cursor, slot);
+                       fwrite (val->value, val->len, 1, stdout);
+                       fputc ('\n', stdout);
+                       cnt++;
+                 }
+
+               fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
+               break;
+
        case 'c':
 #ifdef unix
                posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
@@ -3343,7 +3390,7 @@ FILE *in;
                }
                
                cnt--;  // remove stopper key
-               fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
+               fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
                break;
        }
 
@@ -3377,10 +3424,12 @@ BtKey *ptr;
 BtDb *bt;
 
        if( argc < 3 ) {
-               fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
-               fprintf (stderr, "  where page_bits is the page size in bits\n");
+               fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
+               fprintf (stderr, "  where idx_file is the name of the btree file\n");
+               fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
+               fprintf (stderr, "  page_bits is the page size in bits\n");
                fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
-               fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
+               fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
                fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
                exit(0);
        }