]> pd.if.org Git - btree/commitdiff
Separate scanning from counting. Utilize file locking during btree creation.
authorunknown <karl@E04.petzent.com>
Thu, 27 Feb 2014 22:43:30 +0000 (14:43 -0800)
committerunknown <karl@E04.petzent.com>
Thu, 27 Feb 2014 22:43:30 +0000 (14:43 -0800)
btree2s.c
btree2t.c
btree2u.c

index 14d9825284a3e307547b43fa75d82c1655444512..19ad3beb64436ce60d78a7e03327f2c2c06895cc 100644 (file)
--- a/btree2s.c
+++ b/btree2s.c
@@ -1844,6 +1844,7 @@ time_t tod[1];
 uint scan = 0;
 uint len = 0;
 uint map = 0;
+uid page_no;
 BtKey ptr;
 BtDb *bt;
 FILE *in;
@@ -1946,9 +1947,55 @@ FILE *in;
                break;
 
        case 's':
-               scan++;
-               break;
+               cnt = len = key[0] = 0;
+
+               if( slot = bt_startkey (bt, key, len) )
+                 slot--;
+               else
+                 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
+
+               while( slot = bt_nextkey (bt, slot) ) {
+                       ptr = bt_key(bt, slot);
+                       fwrite (ptr->key, ptr->len, 1, stdout);
+                       fputc ('\n', stdout);
+                       cnt++;
+               }
+
+         fprintf(stderr, " Total keys read %d\n", cnt - 1);
+         break;
+
+       case 'c':
+         fprintf(stderr, "started counting\n");
+         cnt = 0;
+
+         page_no = LEAF_page;
+         cnt = 0;
+
+         while( 1 ) {
+         uid off = page_no << bt->page_bits;
+#ifdef unix
+               if( !pread (bt->idx, bt->frame, bt->page_size, off) )
+                       break;
+#else
+               DWORD amt[1];
+
+               SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
 
+               if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
+                       break;
+
+               if( *amt <  bt->page_size )
+                       fprintf (stderr, "unable to read page %.8x", page_no);
+#endif
+               if( !bt->frame->free && !bt->frame->lvl )
+                       cnt += bt->frame->act;
+
+               page_no++;
+         }
+               
+         cnt--;        // remove stopper key
+         fprintf(stderr, " Total keys read %d\n", cnt);
+         break;
        }
 
        done = getCpuTime(0);
@@ -1959,24 +2006,6 @@ FILE *in;
        elapsed = getCpuTime(2);
        fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
 
-       dead = cnt = 0;
-       len = key[0] = 0;
-
-       fprintf(stderr, "started reading\n");
-
-       if( slot = bt_startkey (bt, key, len) )
-         slot--;
-       else
-         fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
-
-       while( slot = bt_nextkey (bt, slot) )
-         if( cnt++, scan ) {
-                       ptr = bt_key(bt, slot);
-                       fwrite (ptr->key, ptr->len, 1, stdout);
-                       fputc ('\n', stdout);
-         }
-
-       fprintf(stderr, " Total keys read %d\n", cnt);
        return 0;
 }
 
index 3eb84e0a1b9923cda33926f4a6b4740387b6a87a..54be57f62c0cfa84f369fc30f9bf67f8e565f065 100644 (file)
--- a/btree2t.c
+++ b/btree2t.c
@@ -2383,15 +2383,30 @@ FILE *in;
                break;
 
        case 's':
-               scan++;
+               fprintf(stderr, "started scaning\n");
+               cnt = len = key[0] = 0;
 
-       case 'c':
-         cnt = 0;
+               if( slot = bt_startkey (bt, key, len) )
+                 slot--;
+               else
+                 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
+
+               while( slot = bt_nextkey (bt, slot) ) {
+                       ptr = bt_key(bt, slot);
+                       fwrite (ptr->key, ptr->len, 1, stdout);
+                       fputc ('\n', stdout);
+                       cnt++;
+               }
 
+               fprintf(stderr, " Total keys read %d\n", cnt - 1);
+               break;
+
+       case 'c':
          fprintf(stderr, "started counting\n");
 
          next = bt->latchmgr->nlatchpage + LATCH_page;
          page_no = LEAF_page;
+         cnt = 0;
 
          while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
          uid off = page_no << bt->page_bits;
index 0c344e87ceeb57a27d377576d6db298ebb0c4853..2e818ace951931bd7a2ee2fa3e5a9462491f709c 100644 (file)
--- a/btree2u.c
+++ b/btree2u.c
@@ -151,7 +151,7 @@ typedef struct {
        uint nlatchpage;                        // number of latch pages at BT_latch
        uint latchtotal;                        // number of page latch entries
        uint latchhash;                         // number of latch hash table slots
-       uint latchvictim;                       // next latch entry to examine
+       uint latchvictim;                       // next latch hash entry to examine
 } BtLatchMgr;
 
 //  latch hash table entries
@@ -164,16 +164,14 @@ typedef struct {
 //     latch manager table structure
 
 typedef struct {
+       volatile uid page_no;           // latch set page number on disk
        BtSpinLatch readwr[1];          // read/write page lock
        BtSpinLatch access[1];          // Access Intent/Page delete
        BtSpinLatch parent[1];          // Posting of fence key in parent
-       BtSpinLatch busy[1];            // slot is being moved between chains
+       volatile ushort dirty;          // page is dirty in cache
        volatile uint next;                     // next entry in hash table chain
        volatile uint prev;                     // prev entry in hash table chain
-       volatile uint hash;                     // hash slot entry is under
-       volatile ushort dirty;          // page is dirty in cache
-       volatile ushort pin;            // number of outstanding pins
-       volatile uid page_no;           // latch set page number on disk
+       volatile uint pin;                      // number of outstanding pins
 } BtLatchSet;
 
 //     The object structure for Btree access
@@ -472,23 +470,23 @@ void bt_spinreleaseread(BtSpinLatch *latch)
 
 //     link latch table entry into head of latch hash table
 
-BTERR bt_latchlink (BtDb *bt, uint hashidx, uint victim, uid page_no)
+BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no)
 {
-BtPage page = (BtPage)(victim * bt->page_size + bt->latchpool);
-BtLatchSet *latch = bt->latchsets + victim;
+BtPage page = (BtPage)(slot * bt->page_size + bt->latchpool);
+BtLatchSet *latch = bt->latchsets + slot;
 off64_t off = page_no << bt->page_bits;
 uint amt[1];
 
        if( latch->next = bt->table[hashidx].slot )
-               bt->latchsets[latch->next].prev = victim;
+               bt->latchsets[latch->next].prev = slot;
 
-       bt->table[hashidx].slot = victim;
+       bt->table[hashidx].slot = slot;
        latch->page_no = page_no;
-       latch->hash = hashidx;
        latch->dirty = 0;
        latch->prev = 0;
+       latch->pin = 1;
 #ifdef unix
-       if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) )
+       if( pread (bt->idx, page, bt->page_size, page_no << bt->page_bits) < bt->page_size )
                return bt->err = BTERR_read;
 #else
        SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
@@ -507,7 +505,7 @@ void bt_unpinlatch (BtLatchSet *latch)
 #ifdef unix
        __sync_fetch_and_add(&latch->pin, -1);
 #else
-       _InterlockedDecrement16 (&latch->pin);
+       _InterlockedDecrement (&latch->pin);
 #endif
 }
 
@@ -517,8 +515,8 @@ void bt_unpinlatch (BtLatchSet *latch)
 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
 {
 uint hashidx = page_no % bt->latchmgr->latchhash;
-uint slot, victim, idx;
 BtLatchSet *latch;
+uint slot, idx;
 off64_t off;
 uint amt[1];
 BtPage page;
@@ -534,84 +532,90 @@ BtPage page;
                break;
   } while( slot = latch->next );
 
-  //  found our entry
+  //  found our entry, bring to front of hash chain
 
   if( slot ) {
        latch = bt->latchsets + slot;
 #ifdef unix
        __sync_fetch_and_add(&latch->pin, 1);
 #else
-       _InterlockedIncrement16 (&latch->pin);
+       _InterlockedIncrement (&latch->pin);
 #endif
+       //  unlink our entry from its hash chain position
+
+       if( latch->prev )
+               bt->latchsets[latch->prev].next = latch->next;
+       else
+               bt->table[hashidx].slot = latch->next;
+
+       if( latch->next )
+               bt->latchsets[latch->next].prev = latch->prev;
+
+       //  now link into head of the hash chain
+
+       if( latch->next = bt->table[hashidx].slot )
+               bt->latchsets[latch->next].prev = slot;
+
+       bt->table[hashidx].slot = slot;
+       latch->prev = 0;
+
        bt_spinreleasewrite(bt->table[hashidx].latch);
        return latch;
   }
 
-       //  see if there are any unused entries
+       //  see if there are any unused pool entries
 #ifdef unix
-       victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
+       slot = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
 #else
-       victim = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
+       slot = _InterlockedIncrement (&bt->latchmgr->latchdeployed);
 #endif
 
-       if( victim < bt->latchmgr->latchtotal ) {
-               latch = bt->latchsets + victim;
-#ifdef unix
-               __sync_fetch_and_add(&latch->pin, 1);
-#else
-               _InterlockedIncrement16 (&latch->pin);
-#endif
-               bt_latchlink (bt, hashidx, victim, page_no);
+       if( slot < bt->latchmgr->latchtotal ) {
+               latch = bt->latchsets + slot;
+               if( bt_latchlink (bt, hashidx, slot, page_no) )
+                       return NULL;
                bt_spinreleasewrite (bt->table[hashidx].latch);
                return latch;
        }
 
 #ifdef unix
-       victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
+       __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
 #else
-       victim = _InterlockedDecrement (&bt->latchmgr->latchdeployed);
+       _InterlockedDecrement (&bt->latchmgr->latchdeployed);
 #endif
-  //  find and reuse previous lock entry
+  //  find and reuse previous lru lock entry on victim hash chain
 
   while( 1 ) {
 #ifdef unix
-       victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
+       idx = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
 #else
-       victim = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
+       idx = _InterlockedIncrement (&bt->latchmgr->latchvictim) - 1;
 #endif
-       //      we don't use slot zero
-
-       if( victim %= bt->latchmgr->latchtotal )
-               latch = bt->latchsets + victim;
-       else
-               continue;
-
-       //      take control of our slot
-       //      from other threads
-
-       if( latch->pin || !bt_spinwritetry (latch->busy) )
-               continue;
-
-       idx = latch->hash;
-
        // try to get write lock on hash chain
        //      skip entry if not obtained
        //      or has outstanding locks
 
-       if( !bt_spinwritetry (bt->table[idx].latch) ) {
-               bt_spinreleasewrite (latch->busy);
+       idx %= bt->latchmgr->latchhash;
+
+       if( !bt_spinwritetry (bt->table[idx].latch) )
                continue;
-       }
 
-       if( latch->pin ) {
-               bt_spinreleasewrite (latch->busy);
+       if( slot = bt->table[idx].slot )
+         while( 1 ) {
+               latch = bt->latchsets + slot;
+               if( !latch->next )
+                       break;
+               slot = latch->next;
+         }
+
+       if( !slot || latch->pin ) {
                bt_spinreleasewrite (bt->table[idx].latch);
                continue;
        }
 
        //  update permanent page area in btree
 
-       page = (BtPage)(victim * bt->page_size + bt->latchpool);
+       page = (BtPage)(slot * bt->page_size + bt->latchpool);
        off = latch->page_no << bt->page_bits;
 #ifdef unix
        if( latch->dirty )
@@ -628,7 +632,7 @@ BtPage page;
                return bt->err = BTERR_wrt, NULL;
        }
 #endif
-       //  unlink our available victim from its hash chain
+       //  unlink our available slot from its hash chain
 
        if( latch->prev )
                bt->latchsets[latch->prev].next = latch->next;
@@ -639,14 +643,9 @@ BtPage page;
                bt->latchsets[latch->next].prev = latch->prev;
 
        bt_spinreleasewrite (bt->table[idx].latch);
-#ifdef unix
-       __sync_fetch_and_add(&latch->pin, 1);
-#else
-       _InterlockedIncrement16 (&latch->pin);
-#endif
-       bt_latchlink (bt, hashidx, victim, page_no);
+       if( bt_latchlink (bt, hashidx, slot, page_no) )
+               return NULL;
        bt_spinreleasewrite (bt->table[hashidx].latch);
-       bt_spinreleasewrite (latch->busy);
        return latch;
   }
 }
@@ -722,10 +721,8 @@ struct flock lock[1];
 #endif
 #ifdef unix
        memset (lock, 0, sizeof(lock));
-
-       lock->l_type = F_WRLCK;
        lock->l_len = sizeof(struct BtPage_);
-       lock->l_whence = 0;
+       lock->l_type = F_WRLCK;
 
        if( fcntl (bt->idx, F_SETLKW, lock) < 0 )
                return bt_close (bt), NULL;
@@ -738,7 +735,7 @@ struct flock lock[1];
 
        ovl->OffsetHigh |= 0x80000000;
 
-       if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, len, 0L, ovl) )
+       if( LockFileEx (bt->idx, LOCKFILE_EXCLUSIVE_LOCK, 0, sizeof(struct BtPage_), 0L, ovl) )
                return bt_close (bt), NULL;
 #endif 
 
@@ -783,7 +780,7 @@ struct flock lock[1];
 
        //  calculate number of latch hash table entries
 
-       nlatchpage = (nodemax/8 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
+       nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + bt->page_size - 1) / bt->page_size;
        latchhash = nlatchpage * bt->page_size / sizeof(BtHashEntry);
 
        nlatchpage += nodemax;          // size of the buffer pool in pages
@@ -965,8 +962,10 @@ int reuse;
        // else allocate empty page
 
        if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
-         latch = bt_pinlatch (bt, new_page);
-         temp = bt_mappage (bt, latch);
+         if( latch = bt_pinlatch (bt, new_page) )
+               temp = bt_mappage (bt, latch);
+         else
+               return 0;
 
          bt_putid(bt->latchmgr->alloc[1].right, bt_getid(temp->right));
          bt_spinreleasewrite(bt->latchmgr->lock);
@@ -1116,8 +1115,10 @@ uint mode, prevmode;
        // determine lock mode of drill level
        mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
 
-       bt->latch = bt_pinlatch(bt, page_no);
-       bt->page_no = page_no;
+       if( bt->latch = bt_pinlatch(bt, page_no) )
+               bt->page_no = page_no;
+       else
+               return 0;
 
        // obtain access lock using lock chaining
 
@@ -1253,12 +1254,13 @@ uint idx;
                break;
 
        child = bt_getid (slotptr(root, idx)->id);
-       latch = bt_pinlatch (bt, child);
+       if( latch = bt_pinlatch (bt, child) )
+               temp = bt_mappage (bt, latch);
+       else
+               return bt->err;
 
        bt_lockpage (BtLockDelete, latch);
        bt_lockpage (BtLockWrite, latch);
-
-       temp = bt_mappage (bt, latch);
        memcpy (root, temp, bt->page_size);
 
        if( bt_update (bt, root, bt->latch) )
@@ -1359,10 +1361,12 @@ BtKey ptr;
 
        // obtain lock on right page
 
-       rlatch = bt_pinlatch (bt, right);
-       bt_lockpage(BtLockWrite, rlatch);
+       if( rlatch = bt_pinlatch (bt, right) )
+               temp = bt_mappage (bt, rlatch);
+       else
+               return bt->err;
 
-       temp = bt_mappage (bt, rlatch);
+       bt_lockpage(BtLockWrite, rlatch);
 
        if( temp->kill ) {
                bt_abort(bt, temp, right, 0);
@@ -1655,8 +1659,10 @@ BtKey key;
 
        //      lock right page
 
-       rlatch = bt_pinlatch (bt, right);
-       bt_lockpage (BtLockParent, rlatch);
+       if( rlatch = bt_pinlatch (bt, right) )
+               bt_lockpage (BtLockParent, rlatch);
+       else
+               return bt->err;
 
        // update left (containing) node
 
@@ -1809,8 +1815,11 @@ off64_t right;
                break;
 
        bt->cursor_page = right;
-       latch = bt_pinlatch (bt, right);
-    bt_lockpage(BtLockRead, latch);
+
+       if( latch = bt_pinlatch (bt, right) )
+       bt_lockpage(BtLockRead, latch);
+       else
+               return 0;
 
        bt->page = bt_mappage (bt, latch);
        memcpy (bt->cursor, bt->page, bt->page_size);
@@ -1903,11 +1912,6 @@ BtKey ptr;
 
          if( idx = bt->table[hashidx].slot ) do {
                latch = bt->latchsets + idx;
-               if( *(ushort *)latch->busy )
-                       fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
-               *(ushort *)latch->busy = 0;
-               if( latch->hash != hashidx )
-                       fprintf(stderr, "latchset %d wrong hashidx\n", idx);
                if( latch->pin )
                        fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
          } while( idx = latch->next );
@@ -2109,7 +2113,23 @@ FILE *in;
                break;
 
        case 's':
-               scan++;
+               fprintf(stderr, "started scaning\n");
+               cnt = len = key[0] = 0;
+
+               if( slot = bt_startkey (bt, key, len) )
+                 slot--;
+               else
+                 fprintf(stderr, "Error %d in StartKey. Syserror: %d\n", bt->err, errno), exit(0);
+
+               while( slot = bt_nextkey (bt, slot) ) {
+                       ptr = bt_key(bt, slot);
+                       fwrite (ptr->key, ptr->len, 1, stdout);
+                       fputc ('\n', stdout);
+                       cnt++;
+               }
+
+               fprintf(stderr, " Total keys read %d\n", cnt - 1);
+               break;
 
        case 'c':
          fprintf(stderr, "started counting\n");
@@ -2119,24 +2139,25 @@ FILE *in;
          page_no = LEAF_page;
 
          while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
-         uid off = page_no << bt->page_bits;
-#ifdef unix
-               pread (bt->idx, bt->frame, bt->page_size, off);
-#else
-               DWORD amt[1];
-
-               SetFilePointer (bt->idx, (long)off, NULL, FILE_BEGIN);
-
-               if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
-                       fprintf (stderr, "unable to read page %.8x", page_no);
-
-               if( *amt <  bt->page_size )
-                       fprintf (stderr, "unable to read page %.8x", page_no);
-#endif
-               if( !bt->frame->free && !bt->frame->lvl )
-                       cnt += bt->frame->act;
+         BtLatchSet *latch;
+         BtPage page;
+               if( latch = bt_pinlatch (bt, page_no) )
+                       page = bt_mappage (bt, latch);
+               if( !page->free && !page->lvl )
+                       cnt += page->act;
                if( page_no > LEAF_page )
                        next = page_no + 1;
+               if( scan )
+                for( idx = 0; idx++ < page->cnt; ) {
+                 if( slotptr(page, idx)->dead )
+                       continue;
+                 ptr = keyptr(page, idx);
+                 if( idx != page->cnt && bt_getid (page->right) ) {
+                       fwrite (ptr->key, ptr->len, 1, stdout);
+                       fputc ('\n', stdout);
+                 }
+                }
+               bt_unpinlatch (latch);
                page_no = next;
          }