]> pd.if.org Git - btree/commitdiff
make stopper key a virtual infinite fence
authorunknown <karl@E04.petzent.com>
Thu, 30 Jan 2014 16:10:23 +0000 (08:10 -0800)
committerunknown <karl@E04.petzent.com>
Thu, 30 Jan 2014 16:10:23 +0000 (08:10 -0800)
fosterbtreee.c
fosterbtreef.c
fosterbtreeg.c

index e1fc87ff4fa368a55cbc69286e09555bb771a1e8..89be3c0c9bfc560385d30afc3194d493e26f38b7 100644 (file)
@@ -217,7 +217,6 @@ typedef struct {
        uint mode;                                      // read-write mode
 #ifdef unix
        int idx;
-       char *pooladvise;                       // bit maps for pool page advisements
 #else
        HANDLE idx;
 #endif
@@ -722,7 +721,6 @@ uint slot;
        free (mgr->pool);
        free (mgr->hash);
        free (mgr->latch);
-       free (mgr->pooladvise);
        free (mgr);
 #else
        FlushFileBuffers(mgr->idx);
@@ -855,7 +853,6 @@ SYSTEM_INFO sysinfo[1];
        mgr->pool = calloc (poolmax, sizeof(BtPool));
        mgr->hash = calloc (hashsize, sizeof(ushort));
        mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
-       mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
 #else
        mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
        mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
@@ -1165,8 +1162,6 @@ int flag;
        pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
-       // clear out madvise issued bits
-       memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
 #else
        flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
        pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
@@ -1189,17 +1184,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
-       {
-       uint idx = subpage / 8;
-       uint bit = subpage % 8;
-
-               if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
-                 madvise (page, bt->mgr->page_size, MADV_WILLNEED);
-                 (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
-               }
-       }
-#endif
        return page;
 }
 
@@ -1477,19 +1461,34 @@ int reuse;
 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
 {
 uint diff, higher = bt->page->cnt, low = 1, slot;
+uint good = 0;
 
-       //      low is the lowest candidate, higher is already
-       //      tested as .ge. the given key, loop ends when they meet
+       //      if no right link
+       //        make stopper key an infinite fence value
+       //        by setting the good flag
+
+       if( bt_getid (bt->page->right) )
+               higher++;
+       else
+               good++;
+
+       //      low is the next candidate.
+       //  loop ends when they meet
+
+       //  if good, higher is already
+       //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
                if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot;
+                       higher = slot, good++;
        }
 
-       return higher;
+       //      return zero if key is on right link page
+
+       return good ? higher : 0;
 }
 
 //  find and load page at given level for given key
@@ -1557,60 +1556,57 @@ int foster = 0;
                }
          }
 
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
-
-       //      were we supposed to find a foster child?
-       //      if so, slide right onto it
-
-       if( keycmp (keyptr(bt->page,bt->page->cnt), key, len) < 0 ) {
-               page_no = bt_getid(bt->page->right);
-               foster = 1;
-               continue;
-       }
-
        //  find key on page at this level
        //  and either descend to requested level
        //      or return key slot
 
-       slot = bt_findslot (bt, key, len);
-
-       //      is this slot < foster child area
-       //      on the requested level?
+       if( slot = bt_findslot (bt, key, len) ) {
+         //    is this slot < foster child area
+         //    on the requested level?
 
-       //      if so, return actual slot even if dead
+         //    if so, return actual slot even if dead
 
-       if( slot <= bt->page->cnt - bt->page->foster )
-         if( drill == lvl )
+         if( slot <= bt->page->cnt - bt->page->foster )
+          if( drill == lvl )
                return bt->foster = foster, slot;
 
-       //      find next active slot
+         //    find next active slot
+         //    note: foster children are never dead
 
-       //      note: foster children are never dead
-
-       while( slotptr(bt->page, slot)->dead )
-         if( slot++ < bt->page->cnt )
+         while( slotptr(bt->page, slot)->dead )
+          if( slot++ < bt->page->cnt )
                continue;
-         else {
+          else {
                //  we are waiting for fence key posting
                page_no = bt_getid(bt->page->right);
-               continue;
+               goto slideright;
          }
 
-       //      is this slot < foster child area
-       //      if so, drill to next level
+        //     is this slot < foster child area
+        //     if so, drill to next level
 
-       if( slot <= bt->page->cnt - bt->page->foster )
+        if( slot <= bt->page->cnt - bt->page->foster )
                foster = 0, drill--;
-       else
+        else
                foster = 1;
 
-       //  continue right onto foster child
-       //      or down to next level.
+         //  continue right onto foster child
+         //    or down to next level.
+
+         page_no = bt_getid(slotptr(bt->page, slot)->id);
+
+       //  or slide right into next page
+
+       } else {
+               page_no = bt_getid(bt->page->right);
+               foster = 1;
+       }
 
-       page_no = bt_getid(slotptr(bt->page, slot)->id);
+slideright:
+       prevpage = bt->page_no;
+       prevpool = bt->pool;
+       prevset = bt->set;
+       prevmode = mode;
 
   } while( page_no );
 
@@ -2512,6 +2508,54 @@ uint bt_tod(BtDb *bt, uint slot)
 
 #ifdef STANDALONE
 
+void bt_latchaudit (BtDb *bt)
+{
+ushort idx, hashidx;
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+
+#ifdef unix
+       for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
+               set = bt->mgr->latchsets + idx;
+               if( set->pin ) {
+                       fprintf(stderr, "latchset %d pinned\n", idx);
+                       set->pin = 0;
+               }
+       }
+
+       for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
+         if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
+               set = bt->mgr->latchsets + idx;
+               if( set->hash != hashidx )
+                       fprintf(stderr, "latchset %d wrong hashidx\n", idx);
+               if( set->pin )
+                       fprintf(stderr, "latchset %d pinned\n", idx);
+         } while( idx = set->next );
+       }
+       page_no = LEAF_page;
+
+       while( page_no ) {
+               fprintf(stderr, "page: %.6x\n", (uint)page_no);
+               pool = bt_pinpool (bt, page_no);
+               page = bt_page (bt, pool, page_no);
+           page_no = bt_getid(page->right);
+               bt_unpinpool (pool);
+       }
+
+       page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
+
+       while( page_no ) {
+               fprintf(stderr, "free: %.6x\n", (uint)page_no);
+               pool = bt_pinpool (bt, page_no);
+               page = bt_page (bt, pool, page_no);
+           page_no = bt_getid(page->right);
+               bt_unpinpool (pool);
+       }
+#endif
+}
+
 typedef struct {
        char type, idx;
        char *infile;
@@ -2546,6 +2590,12 @@ FILE *in;
 
        switch(args->type | 0x20)
        {
+       case 'a':
+               fprintf(stderr, "started latch mgr audit\n");
+               bt_latchaudit (bt);
+               fprintf(stderr, "finished latch mgr audit\n");
+               break;
+
        case 'w':
                fprintf(stderr, "started indexing for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )
index c99f1251a59329b0d9d50de08cb3378f1b8dd322..c4379007b2a69cdab1d098ae56bee193f8510975 100644 (file)
@@ -209,7 +209,6 @@ typedef struct {
        uint mode;                                      // read-write mode
 #ifdef unix
        int idx;
-       char *pooladvise;                       // bit maps for pool page advisements
 #else
        HANDLE idx;
 #endif
@@ -685,7 +684,6 @@ uint slot;
        free (mgr->pool);
        free (mgr->hash);
        free (mgr->latch);
-       free (mgr->pooladvise);
        free (mgr);
 #else
        FlushFileBuffers(mgr->idx);
@@ -818,7 +816,6 @@ SYSTEM_INFO sysinfo[1];
        mgr->pool = calloc (poolmax, sizeof(BtPool));
        mgr->hash = calloc (hashsize, sizeof(ushort));
        mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
-       mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
 #else
        mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
        mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
@@ -1067,8 +1064,6 @@ int flag;
        pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
-       // clear out madvise issued bits
-       memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
 #else
        flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
        pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
@@ -1091,17 +1086,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
-       {
-       uint idx = subpage / 8;
-       uint bit = subpage % 8;
-
-               if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
-                 madvise (page, bt->mgr->page_size, MADV_WILLNEED);
-                 (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
-               }
-       }
-#endif
        return page;
 }
 
@@ -1379,25 +1363,40 @@ int reuse;
 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
 {
 uint diff, higher = bt->page->cnt, low = 1, slot;
+uint good = 0;
 
-       //      low is the lowest candidate, higher is already
-       //      tested as .ge. the given key, loop ends when they meet
+       //      if no right link
+       //        make stopper key an infinite fence value
+       //        by setting the good flag
+
+       if( bt_getid (bt->page->right) )
+               higher++;
+       else
+               good++;
+
+       //      low is the next candidate.
+       //  loop ends when they meet
+
+       //  if good, higher is already
+       //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
                if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot;
+                       higher = slot, good++;
        }
 
-       return higher;
+       //      return zero if key is on right link page
+
+       return good ? higher : 0;
 }
 
 //  find and load page at given level for given key
 //     leave page rd or wr locked as requested
 
-int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
+uint bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, BtLock lock)
 {
 uid page_no = ROOT_page, prevpage = 0;
 BtLatchSet *set, *prevset;
@@ -1459,60 +1458,57 @@ int foster = 0;
                }
          }
 
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
-
-       //      were we supposed to find a foster child?
-       //      if so, slide right onto it
-
-       if( keycmp (keyptr(bt->page,bt->page->cnt), key, len) < 0 ) {
-               page_no = bt_getid(bt->page->right);
-               foster = 1;
-               continue;
-       }
-
        //  find key on page at this level
        //  and either descend to requested level
        //      or return key slot
 
-       slot = bt_findslot (bt, key, len);
+       if( slot = bt_findslot (bt, key, len) ) {
+         //    is this slot < foster child area
+         //    on the requested level?
 
-       //      is this slot < foster child area
-       //      on the requested level?
+         //    if so, return actual slot even if dead
 
-       //      if so, return actual slot even if dead
-
-       if( slot <= bt->page->cnt - bt->page->foster )
-         if( drill == lvl )
+         if( slot <= bt->page->cnt - bt->page->foster )
+          if( drill == lvl )
                return bt->foster = foster, slot;
 
-       //      find next active slot
-
-       //      note: foster children are never dead
+         //    find next active slot
+         //    note: foster children are never dead
 
-       while( slotptr(bt->page, slot)->dead )
-         if( slot++ < bt->page->cnt )
+         while( slotptr(bt->page, slot)->dead )
+          if( slot++ < bt->page->cnt )
                continue;
-         else {
+          else {
                //  we are waiting for fence key posting
                page_no = bt_getid(bt->page->right);
-               continue;
+               goto slideright;
          }
 
-       //      is this slot < foster child area
-       //      if so, drill to next level
+        //     is this slot < foster child area
+        //     if so, drill to next level
 
-       if( slot <= bt->page->cnt - bt->page->foster )
+        if( slot <= bt->page->cnt - bt->page->foster )
                foster = 0, drill--;
-       else
+        else
                foster = 1;
 
-       //  continue right onto foster child
-       //      or down to next level.
+         //  continue right onto foster child
+         //    or down to next level.
 
-       page_no = bt_getid(slotptr(bt->page, slot)->id);
+         page_no = bt_getid(slotptr(bt->page, slot)->id);
+
+       //  or slide right into next page
+
+       } else {
+               page_no = bt_getid(bt->page->right);
+               foster = 1;
+       }
+
+slideright:
+       prevpage = bt->page_no;
+       prevpool = bt->pool;
+       prevset = bt->set;
+       prevmode = mode;
 
   } while( page_no );
 
@@ -2414,6 +2410,54 @@ uint bt_tod(BtDb *bt, uint slot)
 
 #ifdef STANDALONE
 
+void bt_latchaudit (BtDb *bt)
+{
+ushort idx, hashidx;
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+
+#ifdef unix
+       for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
+               set = bt->mgr->latchsets + idx;
+               if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
+                       fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
+                       *(ushort *)set->readwr = 0;
+                       *(ushort *)set->access = 0;
+                       *(ushort *)set->parent = 0;
+               }
+               if( set->pin ) {
+                       fprintf(stderr, "latchset %d pinned\n", idx);
+                       set->pin = 0;
+               }
+       }
+
+       for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
+         if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
+               fprintf(stderr, "latchmgr locked\n");
+         if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
+               set = bt->mgr->latchsets + idx;
+               if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
+                       fprintf(stderr, "latchset %d locked\n", idx);
+               if( set->hash != hashidx )
+                       fprintf(stderr, "latchset %d wrong hashidx\n", idx);
+               if( set->pin )
+                       fprintf(stderr, "latchset %d pinned\n", idx);
+         } while( idx = set->next );
+       }
+       page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
+
+       while( page_no ) {
+               fprintf(stderr, "free: %.6x\n", (uint)page_no);
+               pool = bt_pinpool (bt, page_no);
+               page = bt_page (bt, pool, page_no);
+           page_no = bt_getid(page->right);
+               bt_unpinpool (pool);
+       }
+#endif
+}
+
 typedef struct {
        char type, idx;
        char *infile;
@@ -2448,6 +2492,12 @@ FILE *in;
 
        switch(args->type | 0x20)
        {
+       case 'a':
+               fprintf(stderr, "started latch mgr audit\n");
+               bt_latchaudit (bt);
+               fprintf(stderr, "finished latch mgr audit\n");
+               break;
+
        case 'w':
                fprintf(stderr, "started indexing for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )
index 9bccea4ab04f5af48f6b29aacf05207e4fc6eded..e43f57677abd410f569741dd144f7f7f31b682b0 100644 (file)
@@ -218,7 +218,6 @@ typedef struct {
        uint mode;                                      // read-write mode
 #ifdef unix
        int idx;
-       char *pooladvise;                       // bit maps for pool page advisements
 #else
        HANDLE idx;
 #endif
@@ -728,7 +727,6 @@ uint slot;
        free (mgr->pool);
        free (mgr->hash);
        free (mgr->latch);
-       free (mgr->pooladvise);
        free (mgr);
 #else
        FlushFileBuffers(mgr->idx);
@@ -861,7 +859,6 @@ SYSTEM_INFO sysinfo[1];
        mgr->pool = calloc (poolmax, sizeof(BtPool));
        mgr->hash = calloc (hashsize, sizeof(ushort));
        mgr->latch = calloc (hashsize, sizeof(BtLatch));
-       mgr->pooladvise = calloc (poolmax, (mgr->poolmask + 8) / 8);
 #else
        mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
        mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
@@ -1110,8 +1107,6 @@ int flag;
        pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
        if( pool->map == MAP_FAILED )
                return bt->err = BTERR_map;
-       // clear out madvise issued bits
-       memset (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8) / 8), 0, (bt->mgr->poolmask + 8)/8);
 #else
        flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
        pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
@@ -1134,17 +1129,6 @@ uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
 BtPage page;
 
        page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
-#ifdef unix
-       {
-       uint idx = subpage / 8;
-       uint bit = subpage % 8;
-
-               if( ~((bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] >> bit) & 1 ) {
-                 madvise (page, bt->mgr->page_size, MADV_WILLNEED);
-                 (bt->mgr->pooladvise + pool->slot * ((bt->mgr->poolmask + 8)/8))[idx] |= 1 << bit;
-               }
-       }
-#endif
        return page;
 }
 
@@ -1422,19 +1406,34 @@ int reuse;
 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
 {
 uint diff, higher = bt->page->cnt, low = 1, slot;
+uint good = 0;
+
+       //      if no right link
+       //        make stopper key an infinite fence value
+       //        by setting the good flag
+
+       if( bt_getid (bt->page->right) )
+               higher++;
+       else
+               good++;
+
+       //      low is the next candidate.
+       //  loop ends when they meet
 
-       //      low is the lowest candidate, higher is already
-       //      tested as .ge. the given key, loop ends when they meet
+       //  if good, higher is already
+       //      tested as .ge. the given key.
 
        while( diff = higher - low ) {
                slot = low + ( diff >> 1 );
                if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
                        low = slot + 1;
                else
-                       higher = slot;
+                       higher = slot, good++;
        }
 
-       return higher;
+       //      return zero if key is on right link page
+
+       return good ? higher : 0;
 }
 
 //  find and load page at given level for given key
@@ -1502,60 +1501,57 @@ int foster = 0;
                }
          }
 
-       prevpage = bt->page_no;
-       prevpool = bt->pool;
-       prevset = bt->set;
-       prevmode = mode;
-
-       //      were we supposed to find a foster child?
-       //      if so, slide right onto it
-
-       if( keycmp (keyptr(bt->page,bt->page->cnt), key, len) < 0 ) {
-               page_no = bt_getid(bt->page->right);
-               foster = 1;
-               continue;
-       }
-
        //  find key on page at this level
        //  and either descend to requested level
        //      or return key slot
 
-       slot = bt_findslot (bt, key, len);
+       if( slot = bt_findslot (bt, key, len) ) {
+         //    is this slot < foster child area
+         //    on the requested level?
 
-       //      is this slot < foster child area
-       //      on the requested level?
+         //    if so, return actual slot even if dead
 
-       //      if so, return actual slot even if dead
-
-       if( slot <= bt->page->cnt - bt->page->foster )
-         if( drill == lvl )
+         if( slot <= bt->page->cnt - bt->page->foster )
+          if( drill == lvl )
                return bt->foster = foster, slot;
 
-       //      find next active slot
-
-       //      note: foster children are never dead
+         //    find next active slot
+         //    note: foster children are never dead
 
-       while( slotptr(bt->page, slot)->dead )
-         if( slot++ < bt->page->cnt )
+         while( slotptr(bt->page, slot)->dead )
+          if( slot++ < bt->page->cnt )
                continue;
-         else {
+          else {
                //  we are waiting for fence key posting
                page_no = bt_getid(bt->page->right);
-               continue;
+               goto slideright;
          }
 
-       //      is this slot < foster child area
-       //      if so, drill to next level
+        //     is this slot < foster child area
+        //     if so, drill to next level
 
-       if( slot <= bt->page->cnt - bt->page->foster )
+        if( slot <= bt->page->cnt - bt->page->foster )
                foster = 0, drill--;
-       else
+        else
                foster = 1;
 
-       //  continue right onto foster child
-       //      or down to next level.
+         //  continue right onto foster child
+         //    or down to next level.
 
-       page_no = bt_getid(slotptr(bt->page, slot)->id);
+         page_no = bt_getid(slotptr(bt->page, slot)->id);
+
+       //  or slide right into next page
+
+       } else {
+               page_no = bt_getid(bt->page->right);
+               foster = 1;
+       }
+
+slideright:
+       prevpage = bt->page_no;
+       prevpool = bt->pool;
+       prevset = bt->set;
+       prevmode = mode;
 
   } while( page_no );
 
@@ -1674,7 +1670,7 @@ uint idx;
        //      add killed right block to free chain
        //      lock latch mgr
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock);
+       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
 
        //      store free chain in allocation page second right
 
@@ -1688,7 +1684,7 @@ uint idx;
        bt_unpinlatch (rset);
        bt_unpinpool (rpool);
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+       bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
 
        // delete our obsolete fence key from our parent
 
@@ -1845,7 +1841,7 @@ retry:
        //      add ourselves to free chain
        //      lock latch mgr
 
-       bt_spinwritelock(bt->mgr->latchmgr->lock);
+       bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
 
        //      store free chain in allocation page second right
        bt_putid(page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
@@ -1853,7 +1849,7 @@ retry:
 
        // unlock latch mgr and pages
 
-       bt_spinreleasewrite(bt->mgr->latchmgr->lock);
+       bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
        bt_unlockpage(BtLockWrite, lset);
        bt_unpinlatch (lset);
        bt_unpinpool (lpool);
@@ -2457,6 +2453,54 @@ uint bt_tod(BtDb *bt, uint slot)
 
 #ifdef STANDALONE
 
+void bt_latchaudit (BtDb *bt)
+{
+ushort idx, hashidx;
+BtLatchSet *set;
+BtPool *pool;
+BtPage page;
+uid page_no;
+
+#ifdef unix
+       for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
+               set = bt->mgr->latchsets + idx;
+               if( *(ushort *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent ) {
+                       fprintf(stderr, "latchset %d locked for page %6x\n", idx, set->page_no);
+                       *(ushort *)set->readwr = 0;
+                       *(ushort *)set->access = 0;
+                       *(ushort *)set->parent = 0;
+               }
+               if( set->pin ) {
+                       fprintf(stderr, "latchset %d pinned\n", idx);
+                       set->pin = 0;
+               }
+       }
+
+       for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
+         if( *(uint *)bt->mgr->latchmgr->table[hashidx].latch )
+               fprintf(stderr, "latchmgr locked\n");
+         if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
+               set = bt->mgr->latchsets + idx;
+               if( *(uint *)set->readwr || *(ushort *)set->access || *(ushort *)set->parent )
+                       fprintf(stderr, "latchset %d locked\n", idx);
+               if( set->hash != hashidx )
+                       fprintf(stderr, "latchset %d wrong hashidx\n", idx);
+               if( set->pin )
+                       fprintf(stderr, "latchset %d pinned\n", idx);
+         } while( idx = set->next );
+       }
+       page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
+
+       while( page_no ) {
+               fprintf(stderr, "free: %.6x\n", (uint)page_no);
+               pool = bt_pinpool (bt, page_no);
+               page = bt_page (bt, pool, page_no);
+           page_no = bt_getid(page->right);
+               bt_unpinpool (pool);
+       }
+#endif
+}
+
 typedef struct {
        char type, idx;
        char *infile;
@@ -2491,6 +2535,12 @@ FILE *in;
 
        switch(args->type | 0x20)
        {
+       case 'a':
+               fprintf(stderr, "started latch mgr audit\n");
+               bt_latchaudit (bt);
+               fprintf(stderr, "finished latch mgr audit\n");
+               break;
+
        case 'w':
                fprintf(stderr, "started indexing for %s\n", args->infile);
                if( in = fopen (args->infile, "rb") )