]> pd.if.org Git - btree/blob - btree2t.c
fix bt_audit to clear latches, remove -lrt linker depenency on linux
[btree] / btree2t.c
1 // btree version 2t
2 //      with reworked bt_deletekey code
3 // 25 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #endif
30
31 #ifdef unix
32 #include <unistd.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <sys/mman.h>
38 #include <errno.h>
39 #else
40 #define WIN32_LEAN_AND_MEAN
41 #include <windows.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <time.h>
45 #include <fcntl.h>
46 #endif
47
48 #include <memory.h>
49 #include <string.h>
50
51 typedef unsigned long long      uid;
52
53 #ifndef unix
54 typedef unsigned long long      off64_t;
55 typedef unsigned short          ushort;
56 typedef unsigned int            uint;
57 #endif
58
59 #define BT_latchtable   128                                     // number of latch manager slots
60
61 #define BT_ro 0x6f72    // ro
62 #define BT_rw 0x7772    // rw
63 #define BT_fl 0x6c66    // fl
64
65 #define BT_maxbits              24                                      // maximum page size in bits
66 #define BT_minbits              9                                       // minimum page size in bits
67 #define BT_minpage              (1 << BT_minbits)       // minimum page size
68 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
69
70 /*
71 There are five lock types for each node in three independent sets: 
72 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
73 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
74 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
75 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
76 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
77 */
78
79 typedef enum{
80         BtLockAccess,
81         BtLockDelete,
82         BtLockRead,
83         BtLockWrite,
84         BtLockParent
85 }BtLock;
86
87 //      definition for latch implementation
88
89 // exclusive is set for write access
90 // share is count of read accessors
91 // grant write lock when share == 0
92
93 volatile typedef struct {
94         unsigned char mutex[1];
95         unsigned char exclusive:1;
96         unsigned char pending:1;
97         ushort share;
98 } BtSpinLatch;
99
100 //  hash table entries
101
102 typedef struct {
103         BtSpinLatch latch[1];
104         volatile ushort slot;           // Latch table entry at head of chain
105 } BtHashEntry;
106
107 //      latch manager table structure
108
109 typedef struct {
110         BtSpinLatch readwr[1];          // read/write page lock
111         BtSpinLatch access[1];          // Access Intent/Page delete
112         BtSpinLatch parent[1];          // Posting of fence key in parent
113         BtSpinLatch busy[1];            // slot is being moved between chains
114         volatile ushort next;           // next entry in hash table chain
115         volatile ushort prev;           // prev entry in hash table chain
116         volatile ushort pin;            // number of outstanding locks
117         volatile ushort hash;           // hash slot entry is under
118         volatile uid page_no;           // latch set page number
119 } BtLatchSet;
120
121 //      Define the length of the page and key pointers
122
123 #define BtId 6
124
125 //      Page key slot definition.
126
127 //      If BT_maxbits is 15 or less, you can save 2 bytes
128 //      for each key stored by making the first two uints
129 //      into ushorts.  You can also save 4 bytes by removing
130 //      the tod field from the key.
131
132 //      Keys are marked dead, but remain on the page until
133 //      cleanup is called. The fence key (highest key) for
134 //      the page is always present, even if dead.
135
136 typedef struct {
137         uint off:BT_maxbits;            // page offset for key start
138         uint dead:1;                            // set for deleted key
139         uint tod;                                       // time-stamp for key
140         unsigned char id[BtId];         // id associated with key
141 } BtSlot;
142
143 //      The key structure occupies space at the upper end of
144 //      each page.  It's a length byte followed by the value
145 //      bytes.
146
147 typedef struct {
148         unsigned char len;
149         unsigned char key[0];
150 } *BtKey;
151
152 //      The first part of an index page.
153 //      It is immediately followed
154 //      by the BtSlot array of keys.
155
156 typedef struct BtPage_ {
157         uint cnt;                                       // count of keys in page
158         uint act;                                       // count of active keys
159         uint min;                                       // next key offset
160         unsigned char bits:7;           // page size in bits
161         unsigned char free:1;           // page is on free list
162         unsigned char lvl:6;            // level of page
163         unsigned char kill:1;           // page is being deleted
164         unsigned char dirty:1;          // page is dirty
165         unsigned char right[BtId];      // page number to right
166 } *BtPage;
167
168 //      The memory mapping hash table entry
169
170 typedef struct {
171         BtPage page;            // mapped page pointer
172         uid  page_no;           // mapped page number
173         void *lruprev;          // least recently used previous cache block
174         void *lrunext;          // lru next cache block
175         void *hashprev;         // previous cache block for the same hash idx
176         void *hashnext;         // next cache block for the same hash idx
177 #ifndef unix
178         HANDLE hmap;
179 #endif
180 }BtHash;
181
182 typedef struct {
183         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
184         BtSpinLatch lock[1];            // allocation area lite latch
185         ushort latchdeployed;           // highest number of latch entries deployed
186         ushort nlatchpage;                      // number of latch pages at BT_latch
187         ushort latchtotal;                      // number of page latch entries
188         ushort latchhash;                       // number of latch hash table slots
189         ushort latchvictim;                     // next latch entry to examine
190         BtHashEntry table[0];           // the hash table
191 } BtLatchMgr;
192
193 //      The object structure for Btree access
194
195 typedef struct _BtDb {
196         uint page_size;         // each page size       
197         uint page_bits;         // each page size in bits       
198         uint seg_bits;          // segment size in pages in bits
199         uid page_no;            // current page number  
200         uid cursor_page;        // current cursor page number   
201         int  err;
202         uint mode;                      // read-write mode
203         uint mapped_io;         // use memory mapping
204         BtPage temp;            // temporary frame buffer (memory mapped/file IO)
205         BtPage alloc;           // frame buffer for alloc page ( page 0 )
206         BtPage cursor;          // cached frame for start/next (never mapped)
207         BtPage frame;           // spare frame for the page split (never mapped)
208         BtPage zero;            // zeroes frame buffer (never mapped)
209         BtPage page;            // current page
210         BtLatchSet *latch;              // current page latch
211         BtLatchMgr *latchmgr;   // mapped latch page from allocation page
212         BtLatchSet *latchsets;  // mapped latch set from latch pages
213 #ifdef unix
214         int idx;
215 #else
216         HANDLE idx;
217         HANDLE halloc;          // allocation and latch table handle
218 #endif
219         unsigned char *mem;     // frame, cursor, page memory buffer
220         int nodecnt;            // highest page cache segment in use
221         int nodemax;            // highest page cache segment allocated
222         int hashmask;           // number of pages in segments - 1
223         int hashsize;           // size of hash table
224         int found;                      // last deletekey found key
225         BtHash *lrufirst;       // lru list head
226         BtHash *lrulast;        // lru list tail
227         ushort *cache;          // hash table for cached segments
228         BtHash *nodes;          // segment cache
229 } BtDb;
230
231 typedef enum {
232 BTERR_ok = 0,
233 BTERR_notfound,
234 BTERR_struct,
235 BTERR_ovflw,
236 BTERR_lock,
237 BTERR_hash,
238 BTERR_kill,
239 BTERR_map,
240 BTERR_wrt,
241 BTERR_eof
242 } BTERR;
243
244 // B-Tree functions
245 extern void bt_close (BtDb *bt);
246 extern BtDb *bt_open (char *name, uint mode, uint bits, uint cacheblk, uint pgblk, uint hashsize);
247 extern BTERR  bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod);
248 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
249 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
250 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
251 extern uint bt_nextkey   (BtDb *bt, uint slot);
252
253 //      internal functions
254 BTERR bt_update (BtDb *bt, BtPage page, uid page_no);
255 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no);
256 //  Helper functions to return slot values
257
258 extern BtKey bt_key (BtDb *bt, uint slot);
259 extern uid bt_uid (BtDb *bt, uint slot);
260 extern uint bt_tod (BtDb *bt, uint slot);
261
262 //  BTree page number constants
263 #define ALLOC_page              0
264 #define ROOT_page               1
265 #define LEAF_page               2
266 #define LATCH_page              3
267
268 //      Number of levels to create in a new BTree
269
270 #define MIN_lvl                 2
271
272 //  The page is allocated from low and hi ends.
273 //  The key offsets and row-id's are allocated
274 //  from the bottom, while the text of the key
275 //  is allocated from the top.  When the two
276 //  areas meet, the page is split into two.
277
278 //  A key consists of a length byte, two bytes of
279 //  index number (0 - 65534), and up to 253 bytes
280 //  of key value.  Duplicate keys are discarded.
281 //  Associated with each key is a 48 bit row-id.
282
283 //  The b-tree root is always located at page 1.
284 //      The first leaf page of level zero is always
285 //      located on page 2.
286
287 //      The b-tree pages are linked with right
288 //      pointers to facilitate enumerators,
289 //      and provide for concurrency.
290
291 //      When to root page fills, it is split in two and
292 //      the tree height is raised by a new root at page
293 //      one with two keys.
294
295 //      Deleted keys are marked with a dead bit until
296 //      page cleanup The fence key for a node is always
297 //      present, even after deletion and cleanup.
298
299 //  Deleted leaf pages are reclaimed  on a free list.
300 //      The upper levels of the btree are fixed on creation.
301
302 //  Groups of pages from the btree are optionally
303 //  cached with memory mapping. A hash table is used to keep
304 //  track of the cached pages.  This behaviour is controlled
305 //  by the number of cache blocks parameter and pages per block
306 //      given to bt_open.
307
308 //  To achieve maximum concurrency one page is locked at a time
309 //  as the tree is traversed to find leaf key in question. The right
310 //  page numbers are used in cases where the page is being split,
311 //      or consolidated.
312
313 //  Page 0 (ALLOC page) is dedicated to lock for new page extensions,
314 //      and chains empty leaf pages together for reuse.
315
316 //      Parent locks are obtained to prevent resplitting or deleting a node
317 //      before its fence is posted into its upper level.
318
319 //      A special open mode of BT_fl is provided to safely access files on
320 //      WIN32 networks. WIN32 network operations should not use memory mapping.
321 //      This WIN32 mode sets FILE_FLAG_NOBUFFERING and FILE_FLAG_WRITETHROUGH
322 //      to prevent local caching of network file contents.
323
324 //      Access macros to address slot and key values from the page.
325 //      Page slots use 1 based indexing.
326
327 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
328 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
329
330 void bt_putid(unsigned char *dest, uid id)
331 {
332 int i = BtId;
333
334         while( i-- )
335                 dest[i] = (unsigned char)id, id >>= 8;
336 }
337
338 uid bt_getid(unsigned char *src)
339 {
340 uid id = 0;
341 int i;
342
343         for( i = 0; i < BtId; i++ )
344                 id <<= 8, id |= *src++; 
345
346         return id;
347 }
348
349 BTERR bt_abort (BtDb *bt, BtPage page, uid page_no, BTERR err)
350 {
351 BtKey ptr;
352
353         fprintf(stderr, "\n Btree2 abort, error %d on page %.8x\n", err, page_no);
354         fprintf(stderr, "level=%d kill=%d free=%d cnt=%x act=%x\n", page->lvl, page->kill, page->free, page->cnt, page->act);
355         ptr = keyptr(page, page->cnt);
356         fprintf(stderr, "fence='%.*s'\n", ptr->len, ptr->key);
357         fprintf(stderr, "right=%.8x\n", bt_getid(page->right));
358 }
359
360 //      Latch Manager
361
362 //      wait until write lock mode is clear
363 //      and add 1 to the share count
364
365 void bt_spinreadlock(BtSpinLatch *latch)
366 {
367 ushort prev;
368
369   do {
370         //      obtain latch mutex
371 #ifdef unix
372         if( __sync_lock_test_and_set(latch->mutex, 1) )
373                 continue;
374 #else
375         if( _InterlockedExchange8(latch->mutex, 1) )
376                 continue;
377 #endif
378         //  see if exclusive request is granted or pending
379
380         if( prev = !(latch->exclusive | latch->pending) )
381                 latch->share++;
382
383 #ifdef unix
384         *latch->mutex = 0;
385 #else
386         _InterlockedExchange8(latch->mutex, 0);
387 #endif
388
389         if( prev )
390                 return;
391
392 #ifdef  unix
393   } while( sched_yield(), 1 );
394 #else
395   } while( SwitchToThread(), 1 );
396 #endif
397 }
398
399 //      wait for other read and write latches to relinquish
400
401 void bt_spinwritelock(BtSpinLatch *latch)
402 {
403 uint prev;
404
405   do {
406 #ifdef  unix
407         if( __sync_lock_test_and_set(latch->mutex, 1) )
408                 continue;
409 #else
410         if( _InterlockedExchange8(latch->mutex, 1) )
411                 continue;
412 #endif
413         if( prev = !(latch->share | latch->exclusive) )
414                 latch->exclusive = 1, latch->pending = 0;
415         else
416                 latch->pending = 1;
417 #ifdef unix
418         *latch->mutex = 0;
419 #else
420         _InterlockedExchange8(latch->mutex, 0);
421 #endif
422         if( prev )
423                 return;
424 #ifdef  unix
425   } while( sched_yield(), 1 );
426 #else
427   } while( SwitchToThread(), 1 );
428 #endif
429 }
430
431 //      try to obtain write lock
432
433 //      return 1 if obtained,
434 //              0 otherwise
435
436 int bt_spinwritetry(BtSpinLatch *latch)
437 {
438 uint prev;
439
440 #ifdef unix
441         if( __sync_lock_test_and_set(latch->mutex, 1) )
442                 return 0;
443 #else
444         if( _InterlockedExchange8(latch->mutex, 1) )
445                 return 0;
446 #endif
447         //      take write access if all bits are clear
448
449         if( prev = !(latch->exclusive | latch->share) )
450                 latch->exclusive = 1;
451
452 #ifdef unix
453         *latch->mutex = 0;
454 #else
455         _InterlockedExchange8(latch->mutex, 0);
456 #endif
457         return prev;
458 }
459
460 //      clear write mode
461
462 void bt_spinreleasewrite(BtSpinLatch *latch)
463 {
464 #ifdef unix
465         while( __sync_lock_test_and_set(latch->mutex, 1) )
466                 sched_yield();
467 #else
468         while( _InterlockedExchange8(latch->mutex, 1) )
469                 SwitchToThread();
470 #endif
471         latch->exclusive = 0;
472 #ifdef unix
473         *latch->mutex = 0;
474 #else
475         _InterlockedExchange8(latch->mutex, 0);
476 #endif
477 }
478
479 //      decrement reader count
480
481 void bt_spinreleaseread(BtSpinLatch *latch)
482 {
483 #ifdef unix
484         while( __sync_lock_test_and_set(latch->mutex, 1) )
485                 sched_yield();
486 #else
487         while( _InterlockedExchange8(latch->mutex, 1) )
488                 SwitchToThread();
489 #endif
490         latch->share--;
491 #ifdef unix
492         *latch->mutex = 0;
493 #else
494         _InterlockedExchange8(latch->mutex, 0);
495 #endif
496 }
497
498 //      link latch table entry into latch hash table
499
500 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
501 {
502 BtLatchSet *latch = bt->latchsets + victim;
503
504         if( latch->next = bt->latchmgr->table[hashidx].slot )
505                 bt->latchsets[latch->next].prev = victim;
506
507         bt->latchmgr->table[hashidx].slot = victim;
508         latch->page_no = page_no;
509         latch->hash = hashidx;
510         latch->prev = 0;
511 }
512
513 //      release latch pin
514
515 void bt_unpinlatch (BtLatchSet *latch)
516 {
517 #ifdef unix
518         __sync_fetch_and_add(&latch->pin, -1);
519 #else
520         _InterlockedDecrement16 (&latch->pin);
521 #endif
522 }
523
524 //      find existing latchset or inspire new one
525 //      return with latchset pinned
526
527 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
528 {
529 ushort hashidx = page_no % bt->latchmgr->latchhash;
530 ushort slot, avail = 0, victim, idx;
531 BtLatchSet *latch;
532
533         //  obtain read lock on hash table entry
534
535         bt_spinreadlock(bt->latchmgr->table[hashidx].latch);
536
537         if( slot = bt->latchmgr->table[hashidx].slot ) do
538         {
539                 latch = bt->latchsets + slot;
540                 if( page_no == latch->page_no )
541                         break;
542         } while( slot = latch->next );
543
544         if( slot ) {
545 #ifdef unix
546                 __sync_fetch_and_add(&latch->pin, 1);
547 #else
548                 _InterlockedIncrement16 (&latch->pin);
549 #endif
550         }
551
552     bt_spinreleaseread (bt->latchmgr->table[hashidx].latch);
553
554         if( slot )
555                 return latch;
556
557   //  try again, this time with write lock
558
559   bt_spinwritelock(bt->latchmgr->table[hashidx].latch);
560
561   if( slot = bt->latchmgr->table[hashidx].slot ) do
562   {
563         latch = bt->latchsets + slot;
564         if( page_no == latch->page_no )
565                 break;
566         if( !latch->pin && !avail )
567                 avail = slot;
568   } while( slot = latch->next );
569
570   //  found our entry, or take over an unpinned one
571
572   if( slot || (slot = avail) ) {
573         latch = bt->latchsets + slot;
574 #ifdef unix
575         __sync_fetch_and_add(&latch->pin, 1);
576 #else
577         _InterlockedIncrement16 (&latch->pin);
578 #endif
579         latch->page_no = page_no;
580         bt_spinreleasewrite(bt->latchmgr->table[hashidx].latch);
581         return latch;
582   }
583
584         //  see if there are any unused entries
585 #ifdef unix
586         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, 1) + 1;
587 #else
588         victim = _InterlockedIncrement16 (&bt->latchmgr->latchdeployed);
589 #endif
590
591         if( victim < bt->latchmgr->latchtotal ) {
592                 latch = bt->latchsets + victim;
593 #ifdef unix
594                 __sync_fetch_and_add(&latch->pin, 1);
595 #else
596                 _InterlockedIncrement16 (&latch->pin);
597 #endif
598                 bt_latchlink (bt, hashidx, victim, page_no);
599                 bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch);
600                 return latch;
601         }
602
603 #ifdef unix
604         victim = __sync_fetch_and_add (&bt->latchmgr->latchdeployed, -1);
605 #else
606         victim = _InterlockedDecrement16 (&bt->latchmgr->latchdeployed);
607 #endif
608   //  find and reuse previous lock entry
609
610   while( 1 ) {
611 #ifdef unix
612         victim = __sync_fetch_and_add(&bt->latchmgr->latchvictim, 1);
613 #else
614         victim = _InterlockedIncrement16 (&bt->latchmgr->latchvictim) - 1;
615 #endif
616         //      we don't use slot zero
617
618         if( victim %= bt->latchmgr->latchtotal )
619                 latch = bt->latchsets + victim;
620         else
621                 continue;
622
623         //      take control of our slot
624         //      from other threads
625
626         if( latch->pin || !bt_spinwritetry (latch->busy) )
627                 continue;
628
629         idx = latch->hash;
630
631         // try to get write lock on hash chain
632         //      skip entry if not obtained
633         //      or has outstanding locks
634
635         if( !bt_spinwritetry (bt->latchmgr->table[idx].latch) ) {
636                 bt_spinreleasewrite (latch->busy);
637                 continue;
638         }
639
640         if( latch->pin ) {
641                 bt_spinreleasewrite (latch->busy);
642                 bt_spinreleasewrite (bt->latchmgr->table[idx].latch);
643                 continue;
644         }
645
646         //  unlink our available victim from its hash chain
647
648         if( latch->prev )
649                 bt->latchsets[latch->prev].next = latch->next;
650         else
651                 bt->latchmgr->table[idx].slot = latch->next;
652
653         if( latch->next )
654                 bt->latchsets[latch->next].prev = latch->prev;
655
656         bt_spinreleasewrite (bt->latchmgr->table[idx].latch);
657 #ifdef unix
658         __sync_fetch_and_add(&latch->pin, 1);
659 #else
660         _InterlockedIncrement16 (&latch->pin);
661 #endif
662         bt_latchlink (bt, hashidx, victim, page_no);
663         bt_spinreleasewrite (bt->latchmgr->table[hashidx].latch);
664         bt_spinreleasewrite (latch->busy);
665         return latch;
666   }
667 }
668
669 //      close and release memory
670
671 void bt_close (BtDb *bt)
672 {
673 BtHash *hash;
674 #ifdef unix
675         // release mapped pages
676
677         if( hash = bt->lrufirst )
678                 do munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
679                 while(hash = hash->lrunext);
680
681         if( bt->mem )
682                 free (bt->mem);
683         close (bt->idx);
684         free (bt->cache);
685         free (bt);
686 #else
687         if( hash = bt->lrufirst )
688           do
689           {
690                 FlushViewOfFile(hash->page, 0);
691                 UnmapViewOfFile(hash->page);
692                 CloseHandle(hash->hmap);
693           } while(hash = hash->lrunext);
694
695         if( bt->mem)
696                 VirtualFree (bt->mem, 0, MEM_RELEASE);
697         FlushFileBuffers(bt->idx);
698         CloseHandle(bt->idx);
699         GlobalFree (bt->cache);
700         GlobalFree (bt);
701 #endif
702 }
703 //  open/create new btree
704
705 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
706 //              size of mapped page pool (e.g. 8192)
707
708 BtDb *bt_open (char *name, uint mode, uint bits, uint nodemax, uint segsize, uint hashsize)
709 {
710 uint lvl, attr, cacheblk, last, slot, idx;
711 uint nlatchpage, latchhash;
712 BtLatchMgr *latchmgr;
713 off64_t size;
714 uint amt[1];
715 BtKey key;
716 BtDb* bt;
717 int flag;
718
719 #ifndef unix
720 SYSTEM_INFO sysinfo[1];
721 #endif
722
723         // determine sanity of page size and buffer pool
724
725         if( bits > BT_maxbits )
726                 bits = BT_maxbits;
727         else if( bits < BT_minbits )
728                 bits = BT_minbits;
729
730 #ifdef unix
731         bt = calloc (1, sizeof(BtDb));
732
733         bt->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
734
735         if( bt->idx == -1 )
736                 return free(bt), NULL;
737         
738         cacheblk = 4096;        // minimum mmap segment size for unix
739
740 #else
741         bt = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtDb));
742         attr = FILE_ATTRIBUTE_NORMAL;
743         bt->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
744
745         if( bt->idx == INVALID_HANDLE_VALUE )
746                 return GlobalFree(bt), NULL;
747
748         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
749         GetSystemInfo(sysinfo);
750         cacheblk = sysinfo->dwAllocationGranularity;
751 #endif
752
753 #ifdef unix
754         latchmgr = malloc (BT_maxpage);
755         *amt = 0;
756
757         // read minimum page size to get root info
758
759         if( size = lseek (bt->idx, 0L, 2) ) {
760                 if( pread(bt->idx, latchmgr, BT_minpage, 0) == BT_minpage )
761                         bits = latchmgr->alloc->bits;
762                 else
763                         return free(bt), free(latchmgr), NULL;
764         } else if( mode == BT_ro )
765                 return free(latchmgr), bt_close (bt), NULL;
766 #else
767         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
768         size = GetFileSize(bt->idx, amt);
769
770         if( size || *amt ) {
771                 if( !ReadFile(bt->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
772                         return bt_close (bt), NULL;
773                 bits = latchmgr->alloc->bits;
774         } else if( mode == BT_ro )
775                 return bt_close (bt), NULL;
776 #endif
777
778         bt->page_size = 1 << bits;
779         bt->page_bits = bits;
780
781         bt->mode = mode;
782
783         if( cacheblk < bt->page_size )
784                 cacheblk = bt->page_size;
785
786         //  mask for partial memmaps
787
788         bt->hashmask = (cacheblk >> bits) - 1;
789
790         //      see if requested size of pages per memmap is greater
791
792         if( (1 << segsize) > bt->hashmask )
793                 bt->hashmask = (1 << segsize) - 1;
794
795         bt->seg_bits = 0;
796
797         while( (1 << bt->seg_bits) <= bt->hashmask )
798                 bt->seg_bits++;
799
800         bt->hashsize = hashsize;
801
802         if( bt->nodemax = nodemax ) {
803 #ifdef unix
804           bt->nodes = calloc (nodemax, sizeof(BtHash));
805           bt->cache = calloc (hashsize, sizeof(ushort));
806 #else
807           bt->nodes = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, nodemax * sizeof(BtHash));
808           bt->cache = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
809 #endif
810           bt->mapped_io = 1;
811         }
812
813         if( size || *amt )
814                 goto btlatch;
815
816         // initialize an empty b-tree with latch page, root page, page of leaves
817         // and page(s) of latches
818
819         memset (latchmgr, 0, 1 << bits);
820         nlatchpage = BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1; 
821         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
822         latchmgr->alloc->bits = bt->page_bits;
823
824         latchmgr->nlatchpage = nlatchpage;
825         latchmgr->latchtotal = nlatchpage * (bt->page_size / sizeof(BtLatchSet));
826
827         //  initialize latch manager
828
829         latchhash = (bt->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
830
831         //      size of hash table = total number of latchsets
832
833         if( latchhash > latchmgr->latchtotal )
834                 latchhash = latchmgr->latchtotal;
835
836         latchmgr->latchhash = latchhash;
837
838 #ifdef unix
839         if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
840                 return bt_close (bt), NULL;
841 #else
842         if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
843                 return bt_close (bt), NULL;
844
845         if( *amt < bt->page_size )
846                 return bt_close (bt), NULL;
847 #endif
848
849         memset (latchmgr, 0, 1 << bits);
850         latchmgr->alloc->bits = bt->page_bits;
851
852         for( lvl=MIN_lvl; lvl--; ) {
853                 slotptr(latchmgr->alloc, 1)->off = bt->page_size - 3;
854                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
855                 key = keyptr(latchmgr->alloc, 1);
856                 key->len = 2;           // create stopper key
857                 key->key[0] = 0xff;
858                 key->key[1] = 0xff;
859                 latchmgr->alloc->min = bt->page_size - 3;
860                 latchmgr->alloc->lvl = lvl;
861                 latchmgr->alloc->cnt = 1;
862                 latchmgr->alloc->act = 1;
863 #ifdef unix
864                 if( write (bt->idx, latchmgr, bt->page_size) < bt->page_size )
865                         return bt_close (bt), NULL;
866 #else
867                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
868                         return bt_close (bt), NULL;
869
870                 if( *amt < bt->page_size )
871                         return bt_close (bt), NULL;
872 #endif
873         }
874
875         // clear out latch manager locks
876         //      and rest of pages to round out segment
877
878         memset(latchmgr, 0, bt->page_size);
879         last = MIN_lvl + 1;
880
881         while( last <= ((MIN_lvl + 1 + nlatchpage) | bt->hashmask) ) {
882 #ifdef unix
883                 pwrite(bt->idx, latchmgr, bt->page_size, last << bt->page_bits);
884 #else
885                 SetFilePointer (bt->idx, last << bt->page_bits, NULL, FILE_BEGIN);
886                 if( !WriteFile (bt->idx, (char *)latchmgr, bt->page_size, amt, NULL) )
887                         return bt_close (bt), NULL;
888                 if( *amt < bt->page_size )
889                         return bt_close (bt), NULL;
890 #endif
891                 last++;
892         }
893
894 btlatch:
895 #ifdef unix
896         flag = PROT_READ | PROT_WRITE;
897         bt->latchmgr = mmap (0, bt->page_size, flag, MAP_SHARED, bt->idx, ALLOC_page * bt->page_size);
898         if( bt->latchmgr == MAP_FAILED )
899                 return bt_close (bt), NULL;
900         bt->latchsets = (BtLatchSet *)mmap (0, bt->latchmgr->nlatchpage * bt->page_size, flag, MAP_SHARED, bt->idx, LATCH_page * bt->page_size);
901         if( bt->latchsets == MAP_FAILED )
902                 return bt_close (bt), NULL;
903 #else
904         flag = PAGE_READWRITE;
905         bt->halloc = CreateFileMapping(bt->idx, NULL, flag, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size, NULL);
906         if( !bt->halloc )
907                 return bt_close (bt), NULL;
908
909         flag = FILE_MAP_WRITE;
910         bt->latchmgr = MapViewOfFile(bt->halloc, flag, 0, 0, (BT_latchtable / (bt->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * bt->page_size);
911         if( !bt->latchmgr )
912                 return GetLastError(), bt_close (bt), NULL;
913
914         bt->latchsets = (void *)((char *)bt->latchmgr + LATCH_page * bt->page_size);
915 #endif
916
917 #ifdef unix
918         free (latchmgr);
919 #else
920         VirtualFree (latchmgr, 0, MEM_RELEASE);
921 #endif
922
923 #ifdef unix
924         bt->mem = malloc (6 * bt->page_size);
925 #else
926         bt->mem = VirtualAlloc(NULL, 6 * bt->page_size, MEM_COMMIT, PAGE_READWRITE);
927 #endif
928         bt->frame = (BtPage)bt->mem;
929         bt->cursor = (BtPage)(bt->mem + bt->page_size);
930         bt->page = (BtPage)(bt->mem + 2 * bt->page_size);
931         bt->alloc = (BtPage)(bt->mem + 3 * bt->page_size);
932         bt->temp = (BtPage)(bt->mem + 4 * bt->page_size);
933         bt->zero = (BtPage)(bt->mem + 5 * bt->page_size);
934
935         memset (bt->zero, 0, bt->page_size);
936         return bt;
937 }
938
939 // place write, read, or parent lock on requested page_no.
940
941 void bt_lockpage(BtLock mode, BtLatchSet *latch)
942 {
943         switch( mode ) {
944         case BtLockRead:
945                 bt_spinreadlock (latch->readwr);
946                 break;
947         case BtLockWrite:
948                 bt_spinwritelock (latch->readwr);
949                 break;
950         case BtLockAccess:
951                 bt_spinreadlock (latch->access);
952                 break;
953         case BtLockDelete:
954                 bt_spinwritelock (latch->access);
955                 break;
956         case BtLockParent:
957                 bt_spinwritelock (latch->parent);
958                 break;
959         }
960 }
961
962 // remove write, read, or parent lock on requested page
963
964 void bt_unlockpage(BtLock mode, BtLatchSet *latch)
965 {
966         switch( mode ) {
967         case BtLockRead:
968                 bt_spinreleaseread (latch->readwr);
969                 break;
970         case BtLockWrite:
971                 bt_spinreleasewrite (latch->readwr);
972                 break;
973         case BtLockAccess:
974                 bt_spinreleaseread (latch->access);
975                 break;
976         case BtLockDelete:
977                 bt_spinreleasewrite (latch->access);
978                 break;
979         case BtLockParent:
980                 bt_spinreleasewrite (latch->parent);
981                 break;
982         }
983 }
984
985 //      allocate a new page and write page into it
986
987 uid bt_newpage(BtDb *bt, BtPage page)
988 {
989 uid new_page;
990 int reuse;
991
992         //      lock allocation page
993
994         bt_spinwritelock(bt->latchmgr->lock);
995
996         // use empty chain first
997         // else allocate empty page
998
999         if( new_page = bt_getid(bt->latchmgr->alloc[1].right) ) {
1000                 if( bt_mappage (bt, &bt->temp, new_page) )
1001                         return 0;
1002                 bt_putid(bt->latchmgr->alloc[1].right, bt_getid(bt->temp->right));
1003                 reuse = 1;
1004         } else {
1005                 new_page = bt_getid(bt->latchmgr->alloc->right);
1006                 bt_putid(bt->latchmgr->alloc->right, new_page+1);
1007                 reuse = 0;
1008         }
1009
1010         bt_spinreleasewrite(bt->latchmgr->lock);
1011
1012         if( !bt->mapped_io )
1013           if( bt_update(bt, page, new_page) )
1014                 return 0;       //don't unlock on error
1015           else
1016                 return new_page;
1017
1018 #ifdef unix
1019         if( pwrite(bt->idx, page, bt->page_size, new_page << bt->page_bits) < bt->page_size )
1020                 return bt->err = BTERR_wrt, 0;
1021
1022         // if writing first page of pool block, zero last page in the block
1023
1024         if( !reuse && bt->hashmask > 0 && (new_page & bt->hashmask) == 0 )
1025         {
1026                 // use zero buffer to write zeros
1027                 if( pwrite(bt->idx,bt->zero, bt->page_size, (new_page | bt->hashmask) << bt->page_bits) < bt->page_size )
1028                         return bt->err = BTERR_wrt, 0;
1029         }
1030 #else
1031         //      bring new page into pool and copy page.
1032         //      this will extend the file into the new pages.
1033
1034         if( bt_mappage (bt, &bt->temp, new_page) )
1035                 return 0;
1036
1037         memcpy(bt->temp, page, bt->page_size);
1038 #endif
1039         return new_page;
1040 }
1041
1042 //  compare two keys, returning > 0, = 0, or < 0
1043 //  as the comparison value
1044
1045 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1046 {
1047 uint len1 = key1->len;
1048 int ans;
1049
1050         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1051                 return ans;
1052
1053         if( len1 > len2 )
1054                 return 1;
1055         if( len1 < len2 )
1056                 return -1;
1057
1058         return 0;
1059 }
1060
1061 //  Update current page of btree by writing file contents
1062 //      or flushing mapped area to disk.
1063
1064 BTERR bt_update (BtDb *bt, BtPage page, uid page_no)
1065 {
1066 off64_t off = page_no << bt->page_bits;
1067
1068 #ifdef unix
1069     if( !bt->mapped_io )
1070          if( pwrite(bt->idx, page, bt->page_size, off) != bt->page_size )
1071                  return bt->err = BTERR_wrt;
1072 #else
1073 uint amt[1];
1074         if( !bt->mapped_io )
1075         {
1076                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1077                 if( !WriteFile (bt->idx, (char *)page, bt->page_size, amt, NULL) )
1078                         return GetLastError(), bt->err = BTERR_wrt;
1079
1080                 if( *amt < bt->page_size )
1081                         return GetLastError(), bt->err = BTERR_wrt;
1082         } 
1083         else if( bt->mode == BT_fl ) {
1084                         FlushViewOfFile(page, bt->page_size);
1085                         FlushFileBuffers(bt->idx);
1086         }
1087 #endif
1088         return 0;
1089 }
1090
1091 // find page in cache 
1092
1093 BtHash *bt_findhash(BtDb *bt, uid page_no)
1094 {
1095 BtHash *hash;
1096 uint idx;
1097
1098         // compute cache block first page and hash idx 
1099
1100         page_no &= ~bt->hashmask;
1101         idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
1102
1103         if( bt->cache[idx] ) 
1104                 hash = bt->nodes + bt->cache[idx];
1105         else
1106                 return NULL;
1107
1108         do if( hash->page_no == page_no )
1109                  break;
1110         while(hash = hash->hashnext );
1111
1112         return hash;
1113 }
1114
1115 // add page cache entry to hash index
1116
1117 void bt_linkhash(BtDb *bt, BtHash *node, uid page_no)
1118 {
1119 uint idx = (uint)(page_no >> bt->seg_bits) % bt->hashsize;
1120 BtHash *hash;
1121
1122         if( bt->cache[idx] ) {
1123                 node->hashnext = hash = bt->nodes + bt->cache[idx];
1124                 hash->hashprev = node;
1125         }
1126
1127         node->hashprev = NULL;
1128         bt->cache[idx] = (ushort)(node - bt->nodes);
1129 }
1130
1131 // remove cache entry from hash table
1132
1133 void bt_unlinkhash(BtDb *bt, BtHash *node)
1134 {
1135 uint idx = (uint)(node->page_no >> bt->seg_bits) % bt->hashsize;
1136 BtHash *hash;
1137
1138         // unlink node
1139         if( hash = node->hashprev )
1140                 hash->hashnext = node->hashnext;
1141         else if( hash = node->hashnext )
1142                 bt->cache[idx] = (ushort)(hash - bt->nodes);
1143         else
1144                 bt->cache[idx] = 0;
1145
1146         if( hash = node->hashnext )
1147                 hash->hashprev = node->hashprev;
1148 }
1149
1150 // add cache page to lru chain and map pages
1151
1152 BtPage bt_linklru(BtDb *bt, BtHash *hash, uid page_no)
1153 {
1154 int flag;
1155 off64_t off = (page_no & ~bt->hashmask) << bt->page_bits;
1156 off64_t limit = off + ((bt->hashmask+1) << bt->page_bits);
1157 BtHash *node;
1158
1159         memset(hash, 0, sizeof(BtHash));
1160         hash->page_no = (page_no & ~bt->hashmask);
1161         bt_linkhash(bt, hash, page_no);
1162
1163         if( node = hash->lrunext = bt->lrufirst )
1164                 node->lruprev = hash;
1165         else
1166                 bt->lrulast = hash;
1167
1168         bt->lrufirst = hash;
1169
1170 #ifdef unix
1171         flag = PROT_READ | ( bt->mode == BT_ro ? 0 : PROT_WRITE );
1172         hash->page = (BtPage)mmap (0, (bt->hashmask+1) << bt->page_bits, flag, MAP_SHARED, bt->idx, off);
1173         if( hash->page == MAP_FAILED )
1174                 return bt->err = BTERR_map, (BtPage)NULL;
1175
1176 #else
1177         flag = ( bt->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1178         hash->hmap = CreateFileMapping(bt->idx, NULL, flag,     (DWORD)(limit >> 32), (DWORD)limit, NULL);
1179         if( !hash->hmap )
1180                 return bt->err = BTERR_map, NULL;
1181
1182         flag = ( bt->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1183         hash->page = MapViewOfFile(hash->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->hashmask+1) << bt->page_bits);
1184         if( !hash->page )
1185                 return bt->err = BTERR_map, NULL;
1186 #endif
1187
1188         return (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
1189 }
1190
1191 //      find or place requested page in page-cache
1192 //      return memory address where page is located.
1193
1194 BtPage bt_hashpage(BtDb *bt, uid page_no)
1195 {
1196 BtHash *hash, *node, *next;
1197 BtPage page;
1198
1199         // find page in cache and move to top of lru list  
1200
1201         if( hash = bt_findhash(bt, page_no) ) {
1202                 page = (BtPage)((char*)hash->page + ((uint)(page_no & bt->hashmask) << bt->page_bits));
1203                 // swap node in lru list
1204                 if( node = hash->lruprev ) {
1205                         if( next = node->lrunext = hash->lrunext )
1206                                 next->lruprev = node;
1207                         else
1208                                 bt->lrulast = node;
1209
1210                         if( next = hash->lrunext = bt->lrufirst )
1211                                 next->lruprev = hash;
1212                         else
1213                                 return bt->err = BTERR_hash, (BtPage)NULL;
1214
1215                         hash->lruprev = NULL;
1216                         bt->lrufirst = hash;
1217                 }
1218                 return page;
1219         }
1220
1221         // map pages and add to cache entry
1222
1223         if( bt->nodecnt < bt->nodemax ) {
1224                 hash = bt->nodes + ++bt->nodecnt;
1225                 return bt_linklru(bt, hash, page_no);
1226         }
1227
1228         // hash table is already full, replace last lru entry from the cache
1229
1230         if( hash = bt->lrulast ) {
1231                 // unlink from lru list
1232                 if( node = bt->lrulast = hash->lruprev )
1233                         node->lrunext = NULL;
1234                 else
1235                         return bt->err = BTERR_hash, (BtPage)NULL;
1236
1237 #ifdef unix
1238                 munmap (hash->page, (bt->hashmask+1) << bt->page_bits);
1239 #else
1240                 FlushViewOfFile(hash->page, 0);
1241                 UnmapViewOfFile(hash->page);
1242                 CloseHandle(hash->hmap);
1243 #endif
1244                 // unlink from hash table
1245
1246                 bt_unlinkhash(bt, hash);
1247
1248                 // map and add to cache
1249
1250                 return bt_linklru(bt, hash, page_no);
1251         }
1252
1253         return bt->err = BTERR_hash, (BtPage)NULL;
1254 }
1255
1256 //  map a btree page onto current page
1257
1258 BTERR bt_mappage (BtDb *bt, BtPage *page, uid page_no)
1259 {
1260 off64_t off = page_no << bt->page_bits;
1261 #ifndef unix
1262 int amt[1];
1263 #endif
1264         
1265         if( bt->mapped_io ) {
1266                 bt->err = 0;
1267                 *page = bt_hashpage(bt, page_no);
1268                 return bt->err;
1269         }
1270 #ifdef unix
1271         if( pread(bt->idx, *page, bt->page_size, off) < bt->page_size )
1272                 return bt->err = BTERR_map;
1273 #else
1274         SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
1275
1276         if( !ReadFile(bt->idx, *page, bt->page_size, amt, NULL) )
1277                 return bt->err = BTERR_map;
1278
1279         if( *amt <  bt->page_size )
1280                 return bt->err = BTERR_map;
1281 #endif
1282         return 0;
1283 }
1284
1285 //      deallocate a deleted page 
1286 //      place on free chain out of allocator page
1287 //      call with page latched for Writing and Deleting
1288
1289 BTERR bt_freepage(BtDb *bt, uid page_no, BtPage page, BtLatchSet *latch)
1290 {
1291         if( bt_mappage (bt, &page, page_no) )
1292                 return bt->err;
1293
1294         //      lock allocation page
1295
1296         bt_spinwritelock (bt->latchmgr->lock);
1297
1298         //      store chain in second right
1299         bt_putid(page->right, bt_getid(bt->latchmgr->alloc[1].right));
1300         bt_putid(bt->latchmgr->alloc[1].right, page_no);
1301         page->free = 1;
1302
1303         if( bt_update(bt, page, page_no) )
1304                 return bt->err;
1305
1306         // unlock released page
1307
1308         bt_unlockpage (BtLockDelete, latch);
1309         bt_unlockpage (BtLockWrite, latch);
1310         bt_unpinlatch (latch);
1311
1312         // unlock allocation page
1313
1314         bt_spinreleasewrite (bt->latchmgr->lock);
1315         return 0;
1316 }
1317
1318 //  find slot in page for given key at a given level
1319
1320 int bt_findslot (BtDb *bt, unsigned char *key, uint len)
1321 {
1322 uint diff, higher = bt->page->cnt, low = 1, slot;
1323 uint good = 0;
1324
1325         //      make stopper key an infinite fence value
1326
1327         if( bt_getid (bt->page->right) )
1328                 higher++;
1329         else
1330                 good++;
1331
1332         //      low is the lowest candidate, higher is already
1333         //      tested as .ge. the given key, loop ends when they meet
1334
1335         while( diff = higher - low ) {
1336                 slot = low + ( diff >> 1 );
1337                 if( keycmp (keyptr(bt->page, slot), key, len) < 0 )
1338                         low = slot + 1;
1339                 else
1340                         higher = slot, good++;
1341         }
1342
1343         //      return zero if key is on right link page
1344
1345         return good ? higher : 0;
1346 }
1347
1348 //  find and load page at given level for given key
1349 //      leave page rd or wr locked as requested
1350
1351 int bt_loadpage (BtDb *bt, unsigned char *key, uint len, uint lvl, uint lock)
1352 {
1353 uid page_no = ROOT_page, prevpage = 0;
1354 uint drill = 0xff, slot;
1355 BtLatchSet *prevlatch;
1356 uint mode, prevmode;
1357
1358   //  start at root of btree and drill down
1359
1360   do {
1361         // determine lock mode of drill level
1362         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1363
1364         bt->latch = bt_pinlatch(bt, page_no);
1365         bt->page_no = page_no;
1366
1367         // obtain access lock using lock chaining
1368
1369         if( page_no > ROOT_page )
1370                 bt_lockpage(BtLockAccess, bt->latch);
1371
1372         if( prevpage ) {
1373                 bt_unlockpage(prevmode, prevlatch);
1374                 bt_unpinlatch(prevlatch);
1375                 prevpage = 0;
1376         }
1377
1378         // obtain read lock using lock chaining
1379
1380         bt_lockpage(mode, bt->latch);
1381
1382         if( page_no > ROOT_page )
1383                 bt_unlockpage(BtLockAccess, bt->latch);
1384
1385         //      map/obtain page contents
1386
1387         if( bt_mappage (bt, &bt->page, page_no) )
1388                 return 0;
1389
1390         // re-read and re-lock root after determining actual level of root
1391
1392         if( bt->page->lvl < drill) {
1393                 if( bt->page_no != ROOT_page )
1394                         return bt->err = BTERR_struct, 0;
1395                         
1396                 drill = bt->page->lvl;
1397
1398                 if( lock != BtLockRead && drill == lvl ) {
1399                         bt_unlockpage(mode, bt->latch);
1400                         bt_unpinlatch(bt->latch);
1401                         continue;
1402                 }
1403         }
1404
1405         prevpage = bt->page_no;
1406         prevlatch = bt->latch;
1407         prevmode = mode;
1408
1409         //  find key on page at this level
1410         //  and descend to requested level
1411
1412         if( !bt->page->kill )
1413          if( slot = bt_findslot (bt, key, len) ) {
1414           if( drill == lvl )
1415                 return slot;
1416
1417           while( slotptr(bt->page, slot)->dead )
1418                 if( slot++ < bt->page->cnt )
1419                         continue;
1420                 else
1421                         goto slideright;
1422
1423           page_no = bt_getid(slotptr(bt->page, slot)->id);
1424           drill--;
1425           continue;
1426          }
1427
1428         //  or slide right into next page
1429
1430 slideright:
1431         page_no = bt_getid(bt->page->right);
1432
1433   } while( page_no );
1434
1435   // return error on end of right chain
1436
1437   bt->err = BTERR_eof;
1438   return 0;     // return error
1439 }
1440
1441 //      a fence key was deleted from a page
1442 //      push new fence value upwards
1443
1444 BTERR bt_fixfence (BtDb *bt, uid page_no, uint lvl)
1445 {
1446 unsigned char leftkey[256], rightkey[256];
1447 BtLatchSet *latch = bt->latch;
1448 BtKey ptr;
1449
1450         // remove deleted key, the old fence value
1451
1452         ptr = keyptr(bt->page, bt->page->cnt);
1453         memcpy(rightkey, ptr, ptr->len + 1);
1454
1455         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1456         bt->page->dirty = 1;
1457
1458         ptr = keyptr(bt->page, bt->page->cnt);
1459         memcpy(leftkey, ptr, ptr->len + 1);
1460
1461         if( bt_update (bt, bt->page, page_no) )
1462                 return bt->err;
1463
1464         bt_lockpage (BtLockParent, latch);
1465         bt_unlockpage (BtLockWrite, latch);
1466
1467         //  insert new (now smaller) fence key
1468
1469         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl + 1, page_no, time(NULL)) )
1470                 return bt->err;
1471
1472         //  remove old (larger) fence key
1473
1474         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl + 1) )
1475                 return bt->err;
1476
1477         bt_unlockpage (BtLockParent, latch);
1478         bt_unpinlatch (latch);
1479         return 0;
1480 }
1481
1482 //      root has a single child
1483 //      collapse a level from the btree
1484 //      call with root locked in bt->page
1485
1486 BTERR bt_collapseroot (BtDb *bt, BtPage root)
1487 {
1488 BtLatchSet *latch;
1489 uid child;
1490 uint idx;
1491
1492         // find the child entry
1493         //      and promote to new root
1494
1495   do {
1496         for( idx = 0; idx++ < root->cnt; )
1497           if( !slotptr(root, idx)->dead )
1498                 break;
1499
1500         child = bt_getid (slotptr(root, idx)->id);
1501         latch = bt_pinlatch (bt, child);
1502
1503         bt_lockpage (BtLockDelete, latch);
1504         bt_lockpage (BtLockWrite, latch);
1505
1506         if( bt_mappage (bt, &bt->temp, child) )
1507                 return bt->err;
1508
1509         memcpy (root, bt->temp, bt->page_size);
1510
1511         if( bt_update (bt, root, ROOT_page) )
1512                 return bt->err;
1513
1514         if( bt_freepage (bt, child, bt->temp, latch) )
1515                 return bt->err;
1516
1517   } while( root->lvl > 1 && root->act == 1 );
1518
1519   bt_unlockpage (BtLockWrite, bt->latch);
1520   bt_unpinlatch (bt->latch);
1521   return 0;
1522 }
1523
1524 //  find and delete key on page by marking delete flag bit
1525 //  when page becomes empty, delete it
1526
1527 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1528 {
1529 unsigned char lowerkey[256], higherkey[256];
1530 uint slot, dirty = 0, idx, fence, found;
1531 BtLatchSet *latch, *rlatch;
1532 uid page_no, right;
1533 BtKey ptr;
1534
1535         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1536                 ptr = keyptr(bt->page, slot);
1537         else
1538                 return bt->err;
1539
1540         // are we deleting a fence slot?
1541
1542         fence = slot == bt->page->cnt;
1543
1544         // if key is found delete it, otherwise ignore request
1545
1546         if( found = !keycmp (ptr, key, len) )
1547           if( found = slotptr(bt->page, slot)->dead == 0 ) {
1548                 dirty = slotptr(bt->page,slot)->dead = 1;
1549                 bt->page->dirty = 1;
1550                 bt->page->act--;
1551
1552                 // collapse empty slots
1553
1554                 while( idx = bt->page->cnt - 1 )
1555                   if( slotptr(bt->page, idx)->dead ) {
1556                         *slotptr(bt->page, idx) = *slotptr(bt->page, idx + 1);
1557                         memset (slotptr(bt->page, bt->page->cnt--), 0, sizeof(BtSlot));
1558                   } else
1559                         break;
1560           }
1561
1562         right = bt_getid(bt->page->right);
1563         page_no = bt->page_no;
1564         latch = bt->latch;
1565
1566         if( !dirty ) {
1567           if( lvl )
1568                 return bt_abort (bt, bt->page, page_no, BTERR_notfound);
1569           bt_unlockpage(BtLockWrite, latch);
1570           bt_unpinlatch (latch);
1571           return bt->found = found, 0;
1572         }
1573
1574         //      did we delete a fence key in an upper level?
1575
1576         if( lvl && bt->page->act && fence )
1577           if( bt_fixfence (bt, page_no, lvl) )
1578                 return bt->err;
1579           else
1580                 return bt->found = found, 0;
1581
1582         //      is this a collapsed root?
1583
1584         if( lvl > 1 && page_no == ROOT_page && bt->page->act == 1 )
1585           if( bt_collapseroot (bt, bt->page) )
1586                 return bt->err;
1587           else
1588                 return bt->found = found, 0;
1589
1590         // return if page is not empty
1591
1592         if( bt->page->act ) {
1593           if( bt_update(bt, bt->page, page_no) )
1594                 return bt->err;
1595           bt_unlockpage(BtLockWrite, latch);
1596           bt_unpinlatch (latch);
1597           return bt->found = found, 0;
1598         }
1599
1600         // cache copy of fence key
1601         //      in order to find parent
1602
1603         ptr = keyptr(bt->page, bt->page->cnt);
1604         memcpy(lowerkey, ptr, ptr->len + 1);
1605
1606         // obtain lock on right page
1607
1608         rlatch = bt_pinlatch (bt, right);
1609         bt_lockpage(BtLockWrite, rlatch);
1610
1611         if( bt_mappage (bt, &bt->temp, right) )
1612                 return bt->err;
1613
1614         if( bt->temp->kill ) {
1615                 bt_abort(bt, bt->temp, right, 0);
1616                 return bt_abort(bt, bt->page, bt->page_no, BTERR_kill);
1617         }
1618
1619         // pull contents of next page into current empty page 
1620
1621         memcpy (bt->page, bt->temp, bt->page_size);
1622
1623         //      cache copy of key to update
1624
1625         ptr = keyptr(bt->temp, bt->temp->cnt);
1626         memcpy(higherkey, ptr, ptr->len + 1);
1627
1628         //  Mark right page as deleted and point it to left page
1629         //      until we can post updates at higher level.
1630
1631         bt_putid(bt->temp->right, page_no);
1632         bt->temp->kill = 1;
1633
1634         if( bt_update(bt, bt->page, page_no) )
1635                 return bt->err;
1636
1637         if( bt_update(bt, bt->temp, right) )
1638                 return bt->err;
1639
1640         bt_lockpage(BtLockParent, latch);
1641         bt_unlockpage(BtLockWrite, latch);
1642
1643         bt_lockpage(BtLockParent, rlatch);
1644         bt_unlockpage(BtLockWrite, rlatch);
1645
1646         //  redirect higher key directly to consolidated node
1647
1648         if( bt_insertkey (bt, higherkey+1, *higherkey, lvl+1, page_no, time(NULL)) )
1649                 return bt->err;
1650
1651         //  delete old lower key to consolidated node
1652
1653         if( bt_deletekey (bt, lowerkey + 1, *lowerkey, lvl + 1) )
1654                 return bt->err;
1655
1656         //  obtain write & delete lock on deleted node
1657         //      add right block to free chain
1658
1659         bt_lockpage(BtLockDelete, rlatch);
1660         bt_lockpage(BtLockWrite, rlatch);
1661         bt_unlockpage(BtLockParent, rlatch);
1662
1663         if( bt_freepage (bt, right, bt->temp, rlatch) )
1664                 return bt->err;
1665
1666         bt_unlockpage(BtLockParent, latch);
1667         bt_unpinlatch(latch);
1668         return 0;
1669 }
1670
1671 //      find key in leaf level and return row-id
1672
1673 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
1674 {
1675 uint  slot;
1676 BtKey ptr;
1677 uid id;
1678
1679         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
1680                 ptr = keyptr(bt->page, slot);
1681         else
1682                 return 0;
1683
1684         // if key exists, return row-id
1685         //      otherwise return 0
1686
1687         if( ptr->len == len && !memcmp (ptr->key, key, len) )
1688                 id = bt_getid(slotptr(bt->page,slot)->id);
1689         else
1690                 id = 0;
1691
1692         bt_unlockpage (BtLockRead, bt->latch);
1693         bt_unpinlatch (bt->latch);
1694         return id;
1695 }
1696
1697 //      check page for space available,
1698 //      clean if necessary and return
1699 //      0 - page needs splitting
1700 //      >0 - go ahead with new slot
1701  
1702 uint bt_cleanpage(BtDb *bt, uint amt, uint slot)
1703 {
1704 uint nxt = bt->page_size;
1705 BtPage page = bt->page;
1706 uint cnt = 0, idx = 0;
1707 uint max = page->cnt;
1708 uint newslot = slot;
1709 BtKey key;
1710 int ret;
1711
1712         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1713                 return slot;
1714
1715         //      skip cleanup if nothing to reclaim
1716
1717         if( !page->dirty )
1718                 return 0;
1719
1720         memcpy (bt->frame, page, bt->page_size);
1721
1722         // skip page info and set rest of page to zero
1723
1724         memset (page+1, 0, bt->page_size - sizeof(*page));
1725         page->act = 0;
1726
1727         while( cnt++ < max ) {
1728                 if( cnt == slot )
1729                         newslot = idx + 1;
1730                 // always leave fence key in list
1731                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1732                         continue;
1733
1734                 // copy key
1735                 key = keyptr(bt->frame, cnt);
1736                 nxt -= key->len + 1;
1737                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1738
1739                 // copy slot
1740                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
1741                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1742                         page->act++;
1743                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1744                 slotptr(page, idx)->off = nxt;
1745         }
1746
1747         page->min = nxt;
1748         page->cnt = idx;
1749
1750         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
1751                 return newslot;
1752
1753         return 0;
1754 }
1755
1756 // split the root and raise the height of the btree
1757
1758 BTERR bt_splitroot(BtDb *bt, unsigned char *leftkey, uid page_no2)
1759 {
1760 uint nxt = bt->page_size;
1761 BtPage root = bt->page;
1762 uid right;
1763
1764         //  Obtain an empty page to use, and copy the current
1765         //  root contents into it
1766
1767         if( !(right = bt_newpage(bt, root)) )
1768                 return bt->err;
1769
1770         // preserve the page info at the bottom
1771         // and set rest to zero
1772
1773         memset(root+1, 0, bt->page_size - sizeof(*root));
1774
1775         // insert first key on newroot page
1776
1777         nxt -= *leftkey + 1;
1778         memcpy ((unsigned char *)root + nxt, leftkey, *leftkey + 1);
1779         bt_putid(slotptr(root, 1)->id, right);
1780         slotptr(root, 1)->off = nxt;
1781         
1782         // insert second key on newroot page
1783         // and increase the root height
1784
1785         nxt -= 3;
1786         ((unsigned char *)root)[nxt] = 2;
1787         ((unsigned char *)root)[nxt+1] = 0xff;
1788         ((unsigned char *)root)[nxt+2] = 0xff;
1789         bt_putid(slotptr(root, 2)->id, page_no2);
1790         slotptr(root, 2)->off = nxt;
1791
1792         bt_putid(root->right, 0);
1793         root->min = nxt;                // reset lowest used offset and key count
1794         root->cnt = 2;
1795         root->act = 2;
1796         root->lvl++;
1797
1798         // update and release root (bt->page)
1799
1800         if( bt_update(bt, root, bt->page_no) )
1801                 return bt->err;
1802
1803         bt_unlockpage(BtLockWrite, bt->latch);
1804         bt_unpinlatch(bt->latch);
1805         return 0;
1806 }
1807
1808 //  split already locked full node
1809 //      return unlocked.
1810
1811 BTERR bt_splitpage (BtDb *bt)
1812 {
1813 uint cnt = 0, idx = 0, max, nxt = bt->page_size;
1814 unsigned char fencekey[256], rightkey[256];
1815 uid page_no = bt->page_no, right;
1816 BtLatchSet *latch, *rlatch;
1817 BtPage page = bt->page;
1818 uint lvl = page->lvl;
1819 BtKey key;
1820
1821         latch = bt->latch;
1822
1823         //  split higher half of keys to bt->frame
1824         //      the last key (fence key) might be dead
1825
1826         memset (bt->frame, 0, bt->page_size);
1827         max = page->cnt;
1828         cnt = max / 2;
1829         idx = 0;
1830
1831         while( cnt++ < max ) {
1832                 key = keyptr(page, cnt);
1833                 nxt -= key->len + 1;
1834                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1835                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(page,cnt)->id, BtId);
1836                 if( !(slotptr(bt->frame, idx)->dead = slotptr(page, cnt)->dead) )
1837                         bt->frame->act++;
1838                 slotptr(bt->frame, idx)->tod = slotptr(page, cnt)->tod;
1839                 slotptr(bt->frame, idx)->off = nxt;
1840         }
1841
1842         //      remember fence key for new right page
1843
1844         memcpy (rightkey, key, key->len + 1);
1845
1846         bt->frame->bits = bt->page_bits;
1847         bt->frame->min = nxt;
1848         bt->frame->cnt = idx;
1849         bt->frame->lvl = lvl;
1850
1851         // link right node
1852
1853         if( page_no > ROOT_page )
1854                 memcpy (bt->frame->right, page->right, BtId);
1855
1856         //      get new free page and write frame to it.
1857
1858         if( !(right = bt_newpage(bt, bt->frame)) )
1859                 return bt->err;
1860
1861         //      update lower keys to continue in old page
1862
1863         memcpy (bt->frame, page, bt->page_size);
1864         memset (page+1, 0, bt->page_size - sizeof(*page));
1865         nxt = bt->page_size;
1866         page->dirty = 0;
1867         page->act = 0;
1868         cnt = 0;
1869         idx = 0;
1870
1871         //  assemble page of smaller keys
1872         //      (they're all active keys)
1873
1874         while( cnt++ < max / 2 ) {
1875                 key = keyptr(bt->frame, cnt);
1876                 nxt -= key->len + 1;
1877                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1878                 memcpy(slotptr(page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
1879                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
1880                 slotptr(page, idx)->off = nxt;
1881                 page->act++;
1882         }
1883
1884         // remember fence key for smaller page
1885
1886         memcpy (fencekey, key, key->len + 1);
1887
1888         bt_putid(page->right, right);
1889         page->min = nxt;
1890         page->cnt = idx;
1891
1892         // if current page is the root page, split it
1893
1894         if( page_no == ROOT_page )
1895                 return bt_splitroot (bt, fencekey, right);
1896
1897         //      lock right page
1898
1899         rlatch = bt_pinlatch (bt, right);
1900         bt_lockpage (BtLockParent, rlatch);
1901
1902         // update left (containing) node
1903
1904         if( bt_update(bt, page, page_no) )
1905                 return bt->err;
1906
1907         bt_lockpage (BtLockParent, latch);
1908         bt_unlockpage (BtLockWrite, latch);
1909
1910         // insert new fence for reformulated left block
1911
1912         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, page_no, time(NULL)) )
1913                 return bt->err;
1914
1915         //      switch fence for right block of larger keys to new right page
1916
1917         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, right, time(NULL)) )
1918                 return bt->err;
1919
1920         bt_unlockpage (BtLockParent, latch);
1921         bt_unlockpage (BtLockParent, rlatch);
1922
1923         bt_unpinlatch (rlatch);
1924         bt_unpinlatch (latch);
1925         return 0;
1926 }
1927
1928 //  Insert new key into the btree at requested level.
1929 //  Pages are unlocked at exit.
1930
1931 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, uid id, uint tod)
1932 {
1933 uint slot, idx;
1934 BtPage page;
1935 BtKey ptr;
1936
1937   while( 1 ) {
1938         if( slot = bt_loadpage (bt, key, len, lvl, BtLockWrite) )
1939                 ptr = keyptr(bt->page, slot);
1940         else
1941         {
1942                 if( !bt->err )
1943                         bt->err = BTERR_ovflw;
1944                 return bt->err;
1945         }
1946
1947         // if key already exists, update id and return
1948
1949         page = bt->page;
1950
1951         if( !keycmp (ptr, key, len) ) {
1952           if( slotptr(page, slot)->dead )
1953                 page->act++;
1954           slotptr(page, slot)->dead = 0;
1955           slotptr(page, slot)->tod = tod;
1956           bt_putid(slotptr(page,slot)->id, id);
1957           if( bt_update(bt, bt->page, bt->page_no) )
1958                 return bt->err;
1959           bt_unlockpage(BtLockWrite, bt->latch);
1960           bt_unpinlatch (bt->latch);
1961           return 0;
1962         }
1963
1964         // check if page has enough space
1965
1966         if( slot = bt_cleanpage (bt, len, slot) )
1967                 break;
1968
1969         if( bt_splitpage (bt) )
1970                 return bt->err;
1971   }
1972
1973   // calculate next available slot and copy key into page
1974
1975   page->min -= len + 1; // reset lowest used offset
1976   ((unsigned char *)page)[page->min] = len;
1977   memcpy ((unsigned char *)page + page->min +1, key, len );
1978
1979   for( idx = slot; idx < page->cnt; idx++ )
1980         if( slotptr(page, idx)->dead )
1981                 break;
1982
1983   // now insert key into array before slot
1984   // preserving the fence slot
1985
1986   if( idx == page->cnt )
1987         idx++, page->cnt++;
1988
1989   page->act++;
1990
1991   while( idx > slot )
1992         *slotptr(page, idx) = *slotptr(page, idx -1), idx--;
1993
1994   bt_putid(slotptr(page,slot)->id, id);
1995   slotptr(page, slot)->off = page->min;
1996   slotptr(page, slot)->tod = tod;
1997   slotptr(page, slot)->dead = 0;
1998
1999   if( bt_update(bt, bt->page, bt->page_no) )
2000           return bt->err;
2001
2002   bt_unlockpage(BtLockWrite, bt->latch);
2003   bt_unpinlatch(bt->latch);
2004   return 0;
2005 }
2006
2007 //  cache page of keys into cursor and return starting slot for given key
2008
2009 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2010 {
2011 uint slot;
2012
2013         // cache page for retrieval
2014
2015         if( slot = bt_loadpage (bt, key, len, 0, BtLockRead) )
2016           memcpy (bt->cursor, bt->page, bt->page_size);
2017         else
2018           return 0;
2019
2020         bt_unlockpage(BtLockRead, bt->latch);
2021         bt->cursor_page = bt->page_no;
2022         bt_unpinlatch (bt->latch);
2023         return slot;
2024 }
2025
2026 //  return next slot for cursor page
2027 //  or slide cursor right into next page
2028
2029 uint bt_nextkey (BtDb *bt, uint slot)
2030 {
2031 BtLatchSet *latch;
2032 off64_t right;
2033
2034   do {
2035         right = bt_getid(bt->cursor->right);
2036
2037         while( slot++ < bt->cursor->cnt )
2038           if( slotptr(bt->cursor,slot)->dead )
2039                 continue;
2040           else if( right || (slot < bt->cursor->cnt))
2041                 return slot;
2042           else
2043                 break;
2044
2045         if( !right )
2046                 break;
2047
2048         bt->cursor_page = right;
2049         latch = bt_pinlatch (bt, right);
2050     bt_lockpage(BtLockRead, latch);
2051
2052         if( bt_mappage (bt, &bt->page, right) )
2053                 return 0;
2054
2055         memcpy (bt->cursor, bt->page, bt->page_size);
2056         bt_unlockpage(BtLockRead, latch);
2057         bt_unpinlatch (latch);
2058         slot = 0;
2059   } while( 1 );
2060
2061   return bt->err = 0;
2062 }
2063
2064 BtKey bt_key(BtDb *bt, uint slot)
2065 {
2066         return keyptr(bt->cursor, slot);
2067 }
2068
2069 uid bt_uid(BtDb *bt, uint slot)
2070 {
2071         return bt_getid(slotptr(bt->cursor,slot)->id);
2072 }
2073
2074 uint bt_tod(BtDb *bt, uint slot)
2075 {
2076         return slotptr(bt->cursor,slot)->tod;
2077 }
2078
2079
2080 #ifdef STANDALONE
2081
2082 uint bt_audit (BtDb *bt)
2083 {
2084 ushort idx, hashidx;
2085 uid next, page_no;
2086 BtLatchSet *latch;
2087 uint cnt = 0;
2088 BtKey ptr;
2089
2090 #ifdef unix
2091         if( *(uint *)(bt->latchmgr->lock) )
2092                 fprintf(stderr, "Alloc page locked\n");
2093         *(uint *)(bt->latchmgr->lock) = 0;
2094
2095         for( idx = 1; idx < bt->latchmgr->latchdeployed; idx++ ) {
2096                 latch = bt->latchsets + idx;
2097                 if( *(uint *)latch->readwr )
2098                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2099                 *(uint *)latch->readwr = 0;
2100
2101                 if( *(uint *)latch->access )
2102                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2103                 *(uint *)latch->access = 0;
2104
2105                 if( *(uint *)latch->parent )
2106                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2107                 *(uint *)latch->parent = 0;
2108
2109                 if( latch->pin ) {
2110                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2111                         latch->pin = 0;
2112                 }
2113         }
2114
2115         for( hashidx = 0; hashidx < bt->latchmgr->latchhash; hashidx++ ) {
2116           if( *(uint *)(bt->latchmgr->table[hashidx].latch) )
2117                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2118
2119           *(uint *)(bt->latchmgr->table[hashidx].latch) = 0;
2120
2121           if( idx = bt->latchmgr->table[hashidx].slot ) do {
2122                 latch = bt->latchsets + idx;
2123                 if( *(uint *)latch->busy )
2124                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2125                 *(uint *)latch->busy = 0;
2126                 if( latch->hash != hashidx )
2127                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2128                 if( latch->pin )
2129                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2130           } while( idx = latch->next );
2131         }
2132
2133         next = bt->latchmgr->nlatchpage + LATCH_page;
2134         page_no = LEAF_page;
2135
2136         while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2137                 pread (bt->idx, bt->frame, bt->page_size, page_no << bt->page_bits);
2138                 if( !bt->frame->free ) {
2139                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2140                   ptr = keyptr(bt->frame, idx+1);
2141                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2142                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2143                  }
2144                  if( !bt->frame->lvl )
2145                         cnt += bt->frame->act;
2146                 }
2147
2148                 if( page_no > LEAF_page )
2149                         next = page_no + 1;
2150                 page_no = next;
2151         }
2152         return cnt - 1;
2153 #endif
2154 }
2155
2156 #ifndef unix
2157 double getCpuTime(int type)
2158 {
2159 FILETIME crtime[1];
2160 FILETIME xittime[1];
2161 FILETIME systime[1];
2162 FILETIME usrtime[1];
2163 SYSTEMTIME timeconv[1];
2164 double ans = 0;
2165
2166         memset (timeconv, 0, sizeof(SYSTEMTIME));
2167
2168         switch( type ) {
2169         case 0:
2170                 GetSystemTimeAsFileTime (xittime);
2171                 FileTimeToSystemTime (xittime, timeconv);
2172                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2173                 break;
2174         case 1:
2175                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2176                 FileTimeToSystemTime (usrtime, timeconv);
2177                 break;
2178         case 2:
2179                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2180                 FileTimeToSystemTime (systime, timeconv);
2181                 break;
2182         }
2183
2184         ans += (double)timeconv->wHour * 3600;
2185         ans += (double)timeconv->wMinute * 60;
2186         ans += (double)timeconv->wSecond;
2187         ans += (double)timeconv->wMilliseconds / 1000;
2188         return ans;
2189 }
2190 #else
2191 #include <time.h>
2192 #include <sys/resource.h>
2193
2194 double getCpuTime(int type)
2195 {
2196 struct rusage used[1];
2197 struct timeval tv[1];
2198
2199         switch( type ) {
2200         case 0:
2201                 gettimeofday(tv, NULL);
2202                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2203
2204         case 1:
2205                 getrusage(RUSAGE_SELF, used);
2206                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2207
2208         case 2:
2209                 getrusage(RUSAGE_SELF, used);
2210                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2211         }
2212
2213         return 0;
2214 }
2215 #endif
2216
2217 //  standalone program to index file of keys
2218 //  then list them onto std-out
2219
2220 int main (int argc, char **argv)
2221 {
2222 uint slot, line = 0, off = 0, found = 0;
2223 int ch, cnt = 0, bits = 12;
2224 unsigned char key[256];
2225 double done, start;
2226 uid next, page_no;
2227 uint pgblk = 0;
2228 float elapsed;
2229 time_t tod[1];
2230 uint scan = 0;
2231 uint len = 0;
2232 uint map = 0;
2233 BtKey ptr;
2234 BtDb *bt;
2235 FILE *in;
2236
2237         if( argc < 4 ) {
2238                 fprintf (stderr, "Usage: %s idx_file src_file Read/Write/Scan/Delete/Find [page_bits mapped_pool_segments pages_per_segment start_line_number]\n", argv[0]);
2239                 fprintf (stderr, "  page_bits: size of btree page in bits\n");
2240                 fprintf (stderr, "  mapped_pool_segments: size of buffer pool in segments\n");
2241                 fprintf (stderr, "  pages_per_segment: size of buffer pool segment in pages in bits\n");
2242                 exit(0);
2243         }
2244
2245         start = getCpuTime(0);
2246         time(tod);
2247
2248         if( argc > 4 )
2249                 bits = atoi(argv[4]);
2250
2251         if( argc > 5 )
2252                 map = atoi(argv[5]);
2253
2254         if( map > 65536 )
2255                 fprintf (stderr, "Warning: buffer_pool > 65536 segments\n");
2256
2257         if( map && map < 8 )
2258                 fprintf (stderr, "Buffer_pool too small\n");
2259
2260         if( argc > 6 )
2261                 pgblk = atoi(argv[6]);
2262
2263         if( bits + pgblk > 30 )
2264                 fprintf (stderr, "Warning: very large buffer pool segment size\n");
2265
2266         if( argc > 7 )
2267                 off = atoi(argv[7]);
2268
2269         bt = bt_open ((argv[1]), BT_rw, bits, map, pgblk, map /  8);
2270
2271         if( !bt ) {
2272                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2273                 exit (1);
2274         }
2275
2276         switch(argv[3][0]| 0x20)
2277         {
2278         case 'a':
2279                 fprintf(stderr, "started audit for %s\n", argv[2]);
2280                 cnt = bt_audit (bt);
2281                 fprintf(stderr, "finished audit for %s, %d keys\n", argv[2], cnt);
2282                 break;
2283
2284         case 'w':
2285                 fprintf(stderr, "started indexing for %s\n", argv[2]);
2286                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2287                   while( ch = getc(in), ch != EOF )
2288                         if( ch == '\n' )
2289                         {
2290                           if( off )
2291                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2292
2293                           if( bt_insertkey (bt, key, len, 0, ++line, *tod) )
2294                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2295                           len = 0;
2296                         }
2297                         else if( len < 245 )
2298                                 key[len++] = ch;
2299                 fprintf(stderr, "finished adding keys for %s, %d \n", argv[2], line);
2300                 break;
2301
2302         case 'd':
2303                 fprintf(stderr, "started deleting keys for %s\n", argv[2]);
2304                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2305                   while( ch = getc(in), ch != EOF )
2306                         if( ch == '\n' )
2307                         {
2308                           if( off )
2309                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2310                           line++;
2311                           if( bt_deletekey (bt, key, len, 0) )
2312                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2313                           len = 0;
2314                         }
2315                         else if( len < 245 )
2316                                 key[len++] = ch;
2317                 fprintf(stderr, "finished deleting keys for %s, %d \n", argv[2], line);
2318                 break;
2319
2320         case 'f':
2321                 fprintf(stderr, "started finding keys for %s\n", argv[2]);
2322                 if( argc > 2 && (in = fopen (argv[2], "rb")) )
2323                   while( ch = getc(in), ch != EOF )
2324                         if( ch == '\n' )
2325                         {
2326                           if( off )
2327                                 sprintf((char *)key+len, "%.9d", line + off), len += 9;
2328                           line++;
2329                           if( bt_findkey (bt, key, len) )
2330                                 found++;
2331                           else if( bt->err )
2332                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2333                           len = 0;
2334                         }
2335                         else if( len < 245 )
2336                                 key[len++] = ch;
2337                 fprintf(stderr, "finished search of %d keys for %s, found %d\n", line, argv[2], found);
2338                 break;
2339
2340         case 's':
2341                 scan++;
2342
2343         case 'c':
2344           cnt = 0;
2345
2346           fprintf(stderr, "started reading\n");
2347
2348           next = bt->latchmgr->nlatchpage + LATCH_page;
2349           page_no = LEAF_page;
2350
2351           while( page_no < bt_getid(bt->latchmgr->alloc->right) ) {
2352           uid off = page_no << bt->page_bits;
2353 #ifdef unix
2354                 pread (bt->idx, bt->frame, bt->page_size, off);
2355 #else
2356                 DWORD amt[1];
2357
2358                 SetFilePointer (bt->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2359
2360                 if( !ReadFile(bt->idx, bt->frame, bt->page_size, amt, NULL))
2361                         return bt->err = BTERR_map;
2362
2363                 if( *amt <  bt->page_size )
2364                         return bt->err = BTERR_map;
2365 #endif
2366                 if( !bt->frame->free && !bt->frame->lvl )
2367                         cnt += bt->frame->act;
2368                 if( page_no > LEAF_page )
2369                         next = page_no + 1;
2370                 page_no = next;
2371           }
2372                 
2373           cnt--;        // remove stopper key
2374           fprintf(stderr, " Total keys read %d\n", cnt);
2375           break;
2376         }
2377
2378         done = getCpuTime(0);
2379         elapsed = (float)(done - start);
2380         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2381         elapsed = getCpuTime(1);
2382         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2383         elapsed = getCpuTime(2);
2384         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2385         return 0;
2386 }
2387
2388 #endif  //STANDALONE