]> pd.if.org Git - btree/blob - threadskv5.c
49047e71cec6318c7980b4cc81e3c5e731484eb7
[btree] / threadskv5.c
1 // btree version threadskv5 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      and bi-directional cursors
7
8 // 08 SEP 2014
9
10 // author: karl malbrain, malbrain@cal.berkeley.edu
11
12 /*
13 This work, including the source code, documentation
14 and related data, is placed into the public domain.
15
16 The orginal author is Karl Malbrain.
17
18 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
19 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
20 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
21 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
22 RESULTING FROM THE USE, MODIFICATION, OR
23 REDISTRIBUTION OF THIS SOFTWARE.
24 */
25
26 // Please see the project home page for documentation
27 // code.google.com/p/high-concurrency-btree
28
29 #define _FILE_OFFSET_BITS 64
30 #define _LARGEFILE64_SOURCE
31
32 #ifdef linux
33 #define _GNU_SOURCE
34 #endif
35
36 #ifdef unix
37 #include <unistd.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <fcntl.h>
41 #include <sys/time.h>
42 #include <sys/mman.h>
43 #include <errno.h>
44 #include <pthread.h>
45 #else
46 #define WIN32_LEAN_AND_MEAN
47 #include <windows.h>
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <time.h>
51 #include <fcntl.h>
52 #include <process.h>
53 #include <intrin.h>
54 #endif
55
56 #include <memory.h>
57 #include <string.h>
58 #include <stddef.h>
59
60 typedef unsigned long long      uid;
61
62 #ifndef unix
63 typedef unsigned long long      off64_t;
64 typedef unsigned short          ushort;
65 typedef unsigned int            uint;
66 #endif
67
68 #define BT_latchtable   128                                     // number of latch manager slots
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 /*
79 There are five lock types for each node in three independent sets: 
80 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
81 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
82 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
83 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
84 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
85 */
86
87 typedef enum{
88         BtLockAccess,
89         BtLockDelete,
90         BtLockRead,
91         BtLockWrite,
92         BtLockParent
93 } BtLock;
94
95 //      definition for phase-fair reader/writer lock implementation
96
97 typedef struct {
98         ushort rin[1];
99         ushort rout[1];
100         ushort ticket[1];
101         ushort serving[1];
102 } RWLock;
103
104 #define PHID 0x1
105 #define PRES 0x2
106 #define MASK 0x3
107 #define RINC 0x4
108
109 //      definition for spin latch implementation
110
111 // exclusive is set for write access
112 // share is count of read accessors
113 // grant write lock when share == 0
114
115 volatile typedef struct {
116         ushort exclusive:1;
117         ushort pending:1;
118         ushort share:14;
119 } BtSpinLatch;
120
121 #define XCL 1
122 #define PEND 2
123 #define BOTH 3
124 #define SHARE 4
125
126 //  hash table entries
127
128 typedef struct {
129         BtSpinLatch latch[1];
130         volatile ushort slot;           // Latch table entry at head of chain
131 } BtHashEntry;
132
133 //      latch manager table structure
134
135 typedef struct {
136         RWLock readwr[1];               // read/write page lock
137         RWLock access[1];               // Access Intent/Page delete
138         RWLock parent[1];               // Posting of fence key in parent
139         BtSpinLatch busy[1];            // slot is being moved between chains
140         volatile ushort next;           // next entry in hash table chain
141         volatile ushort prev;           // prev entry in hash table chain
142         volatile ushort pin;            // number of outstanding locks
143         volatile ushort hash;           // hash slot entry is under
144         volatile uid page_no;           // latch set page number
145 } BtLatchSet;
146
147 //      Define the length of the page and key pointers
148
149 #define BtId 6
150
151 //      Page key slot definition.
152
153 //      Keys are marked dead, but remain on the page until
154 //      it cleanup is called. The fence key (highest key) for
155 //      a leaf page is always present, even after cleanup.
156
157 //      Slot types
158
159 //      In addition to the Unique keys that occupy slots
160 //      there are Librarian and Duplicate key
161 //      slots occupying the key slot array.
162
163 //      The Librarian slots are dead keys that
164 //      serve as filler, available to add new Unique
165 //      or Dup slots that are inserted into the B-tree.
166
167 //      The Duplicate slots have had their key bytes extended
168 //      by 6 bytes to contain a binary duplicate key uniqueifier.
169
170 typedef enum {
171         Unique,
172         Librarian,
173         Duplicate
174 } BtSlotType;
175
176 typedef struct {
177         uint off:BT_maxbits;    // page offset for key start
178         uint type:3;                    // type of slot
179         uint dead:1;                    // set for deleted slot
180 } BtSlot;
181
182 //      The key structure occupies space at the upper end of
183 //      each page.  It's a length byte followed by the key
184 //      bytes.
185
186 typedef struct {
187         unsigned char len;
188         unsigned char key[1];
189 } *BtKey;
190
191 //      the value structure also occupies space at the upper
192 //      end of the page. Each key is immediately followed by a value.
193
194 typedef struct {
195         unsigned char len;
196         unsigned char value[1];
197 } *BtVal;
198
199 //      The first part of an index page.
200 //      It is immediately followed
201 //      by the BtSlot array of keys.
202
203 //      note that this structure size
204 //      must be a multiple of 8 bytes
205 //      in order to place dups correctly.
206
207 typedef struct BtPage_ {
208         uint cnt;                                       // count of keys in page
209         uint act;                                       // count of active keys
210         uint min;                                       // next key offset
211         uint garbage;                           // page garbage in bytes
212         unsigned char bits:7;           // page size in bits
213         unsigned char free:1;           // page is on free chain
214         unsigned char lvl:7;            // level of page
215         unsigned char kill:1;           // page is being deleted
216         unsigned char left[BtId];       // page number to left
217         unsigend char filler[2];        // padding to multiple of 8
218         unsigned char right[BtId];      // page number to right
219 } *BtPage;
220
221 //      The memory mapping pool table buffer manager entry
222
223 typedef struct {
224         uid  basepage;                          // mapped base page number
225         char *map;                                      // mapped memory pointer
226         ushort slot;                            // slot index in this array
227         ushort pin;                                     // mapped page pin counter
228         void *hashprev;                         // previous pool entry for the same hash idx
229         void *hashnext;                         // next pool entry for the same hash idx
230 #ifndef unix
231         HANDLE hmap;                            // Windows memory mapping handle
232 #endif
233 } BtPool;
234
235 #define CLOCK_bit 0x8000                // bit in pool->pin
236
237 //  The loadpage interface object
238
239 typedef struct {
240         uid page_no;            // current page number
241         BtPage page;            // current page pointer
242         BtPool *pool;           // current page pool
243         BtLatchSet *latch;      // current page latch set
244 } BtPageSet;
245
246 //      structure for latch manager on ALLOC_page
247
248 typedef struct {
249         struct BtPage_ alloc[1];        // next page_no in right ptr
250         unsigned long long dups[1];     // global duplicate key uniqueifier
251         unsigned char chain[BtId];      // head of free page_nos chain
252         BtSpinLatch lock[1];            // allocation area lite latch
253         ushort latchdeployed;           // highest number of latch entries deployed
254         ushort nlatchpage;                      // number of latch pages at BT_latch
255         ushort latchtotal;                      // number of page latch entries
256         ushort latchhash;                       // number of latch hash table slots
257         ushort latchvictim;                     // next latch entry to examine
258         BtHashEntry table[0];           // the hash table
259 } BtLatchMgr;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266         uint seg_bits;                          // seg size in pages in bits
267         uint mode;                                      // read-write mode
268 #ifdef unix
269         int idx;
270 #else
271         HANDLE idx;
272 #endif
273         ushort poolcnt;                         // highest page pool node in use
274         ushort poolmax;                         // highest page pool node allocated
275         ushort poolmask;                        // total number of pages in mmap segment - 1
276         ushort hashsize;                        // size of Hash Table for pool entries
277         volatile uint evicted;          // last evicted hash table slot
278         ushort *hash;                           // pool index for hash entries
279         BtSpinLatch *latch;                     // latches for hash table slots
280         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
281         BtLatchSet *latchsets;          // mapped latch set from latch pages
282         BtPool *pool;                           // memory pool page segments
283 #ifndef unix
284         HANDLE halloc;                          // allocation and latch table handle
285 #endif
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                             // buffer manager for thread
290         BtPage cursor;                  // cached frame for start/next (never mapped)
291         BtPage frame;                   // spare frame for the page split (never mapped)
292         uid cursor_page;                        // current cursor page number   
293         unsigned char *mem;             // frame, cursor, page memory buffer
294         unsigned char key[256]; // last found complete key
295         int found;                              // last delete or insert was found
296         int err;                                // last error
297 } BtDb;
298
299 typedef enum {
300         BTERR_ok = 0,
301         BTERR_struct,
302         BTERR_ovflw,
303         BTERR_lock,
304         BTERR_map,
305         BTERR_wrt,
306         BTERR_hash
307 } BTERR;
308
309 // B-Tree functions
310 extern void bt_close (BtDb *bt);
311 extern BtDb *bt_open (BtMgr *mgr);
312 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
313 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
314 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
315 extern BtKey bt_foundkey (BtDb *bt);
316 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
317 extern uint bt_nextkey   (BtDb *bt, uint slot);
318
319 //      manager functions
320 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
321 void bt_mgrclose (BtMgr *mgr);
322
323 //  Helper functions to return slot values
324 //      from the cursor page.
325
326 extern BtKey bt_key (BtDb *bt, uint slot);
327 extern BtVal bt_val (BtDb *bt, uint slot);
328
329 //  BTree page number constants
330 #define ALLOC_page              0       // allocation & latch manager hash table
331 #define ROOT_page               1       // root of the btree
332 #define LEAF_page               2       // first page of leaves
333 #define LATCH_page              3       // pages for latch manager
334
335 //      Number of levels to create in a new BTree
336
337 #define MIN_lvl                 2
338
339 //  The page is allocated from low and hi ends.
340 //  The key slots are allocated from the bottom,
341 //      while the text and value of the key
342 //  are allocated from the top.  When the two
343 //  areas meet, the page is split into two.
344
345 //  A key consists of a length byte, two bytes of
346 //  index number (0 - 65534), and up to 253 bytes
347 //  of key value.
348
349 //  Associated with each key is a value byte string
350 //      containing any value desired.
351
352 //  The b-tree root is always located at page 1.
353 //      The first leaf page of level zero is always
354 //      located on page 2.
355
356 //      The b-tree pages are linked with next
357 //      pointers to facilitate enumerators,
358 //      and provide for concurrency.
359
360 //      When to root page fills, it is split in two and
361 //      the tree height is raised by a new root at page
362 //      one with two keys.
363
364 //      Deleted keys are marked with a dead bit until
365 //      page cleanup. The fence key for a leaf node is
366 //      always present
367
368 //  Groups of pages called segments from the btree are optionally
369 //  cached with a memory mapped pool. A hash table is used to keep
370 //  track of the cached segments.  This behaviour is controlled
371 //  by the cache block size parameter to bt_open.
372
373 //  To achieve maximum concurrency one page is locked at a time
374 //  as the tree is traversed to find leaf key in question. The right
375 //  page numbers are used in cases where the page is being split,
376 //      or consolidated.
377
378 //  Page 0 is dedicated to lock for new page extensions,
379 //      and chains empty pages together for reuse. It also
380 //      contains the latch manager hash table.
381
382 //      The ParentModification lock on a node is obtained to serialize posting
383 //      or changing the fence key for a node.
384
385 //      Empty pages are chained together through the ALLOC page and reused.
386
387 //      Access macros to address slot and key values from the page
388 //      Page slots use 1 based indexing.
389
390 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
391 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
392 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
393
394 void bt_putid(unsigned char *dest, uid id)
395 {
396 int i = BtId;
397
398         while( i-- )
399                 dest[i] = (unsigned char)id, id >>= 8;
400 }
401
402 uid bt_getid(unsigned char *src)
403 {
404 uid id = 0;
405 int i;
406
407         for( i = 0; i < BtId; i++ )
408                 id <<= 8, id |= *src++; 
409
410         return id;
411 }
412
413 uid bt_newdup (BtDb *bt)
414 {
415 #ifdef unix
416         return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
417 #else
418         return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
419 #endif
420 }
421
422 //      Phase-Fair reader/writer lock implementation
423
424 void WriteLock (RWLock *lock)
425 {
426 ushort w, r, tix;
427
428 #ifdef unix
429         tix = __sync_fetch_and_add (lock->ticket, 1);
430 #else
431         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
432 #endif
433         // wait for our ticket to come up
434
435         while( tix != lock->serving[0] )
436 #ifdef unix
437                 sched_yield();
438 #else
439                 SwitchToThread ();
440 #endif
441
442         w = PRES | (tix & PHID);
443 #ifdef  unix
444         r = __sync_fetch_and_add (lock->rin, w);
445 #else
446         r = _InterlockedExchangeAdd16 (lock->rin, w);
447 #endif
448         while( r != *lock->rout )
449 #ifdef unix
450                 sched_yield();
451 #else
452                 SwitchToThread();
453 #endif
454 }
455
456 void WriteRelease (RWLock *lock)
457 {
458 #ifdef unix
459         __sync_fetch_and_and (lock->rin, ~MASK);
460 #else
461         _InterlockedAnd16 (lock->rin, ~MASK);
462 #endif
463         lock->serving[0]++;
464 }
465
466 void ReadLock (RWLock *lock)
467 {
468 ushort w;
469 #ifdef unix
470         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
471 #else
472         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
473 #endif
474         if( w )
475           while( w == (*lock->rin & MASK) )
476 #ifdef unix
477                 sched_yield ();
478 #else
479                 SwitchToThread ();
480 #endif
481 }
482
483 void ReadRelease (RWLock *lock)
484 {
485 #ifdef unix
486         __sync_fetch_and_add (lock->rout, RINC);
487 #else
488         _InterlockedExchangeAdd16 (lock->rout, RINC);
489 #endif
490 }
491
492 //      Spin Latch Manager
493
494 //      wait until write lock mode is clear
495 //      and add 1 to the share count
496
497 void bt_spinreadlock(BtSpinLatch *latch)
498 {
499 ushort prev;
500
501   do {
502 #ifdef unix
503         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
504 #else
505         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
506 #endif
507         //  see if exclusive request is granted or pending
508
509         if( !(prev & BOTH) )
510                 return;
511 #ifdef unix
512         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
513 #else
514         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
515 #endif
516 #ifdef  unix
517   } while( sched_yield(), 1 );
518 #else
519   } while( SwitchToThread(), 1 );
520 #endif
521 }
522
523 //      wait for other read and write latches to relinquish
524
525 void bt_spinwritelock(BtSpinLatch *latch)
526 {
527 ushort prev;
528
529   do {
530 #ifdef  unix
531         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
532 #else
533         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
534 #endif
535         if( !(prev & XCL) )
536           if( !(prev & ~BOTH) )
537                 return;
538           else
539 #ifdef unix
540                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
541 #else
542                 _InterlockedAnd16((ushort *)latch, ~XCL);
543 #endif
544 #ifdef  unix
545   } while( sched_yield(), 1 );
546 #else
547   } while( SwitchToThread(), 1 );
548 #endif
549 }
550
551 //      try to obtain write lock
552
553 //      return 1 if obtained,
554 //              0 otherwise
555
556 int bt_spinwritetry(BtSpinLatch *latch)
557 {
558 ushort prev;
559
560 #ifdef  unix
561         prev = __sync_fetch_and_or((ushort *)latch, XCL);
562 #else
563         prev = _InterlockedOr16((ushort *)latch, XCL);
564 #endif
565         //      take write access if all bits are clear
566
567         if( !(prev & XCL) )
568           if( !(prev & ~BOTH) )
569                 return 1;
570           else
571 #ifdef unix
572                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
573 #else
574                 _InterlockedAnd16((ushort *)latch, ~XCL);
575 #endif
576         return 0;
577 }
578
579 //      clear write mode
580
581 void bt_spinreleasewrite(BtSpinLatch *latch)
582 {
583 #ifdef unix
584         __sync_fetch_and_and((ushort *)latch, ~BOTH);
585 #else
586         _InterlockedAnd16((ushort *)latch, ~BOTH);
587 #endif
588 }
589
590 //      decrement reader count
591
592 void bt_spinreleaseread(BtSpinLatch *latch)
593 {
594 #ifdef unix
595         __sync_fetch_and_add((ushort *)latch, -SHARE);
596 #else
597         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
598 #endif
599 }
600
601 //      link latch table entry into latch hash table
602
603 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
604 {
605 BtLatchSet *set = bt->mgr->latchsets + victim;
606
607         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
608                 bt->mgr->latchsets[set->next].prev = victim;
609
610         bt->mgr->latchmgr->table[hashidx].slot = victim;
611         set->page_no = page_no;
612         set->hash = hashidx;
613         set->prev = 0;
614 }
615
616 //      release latch pin
617
618 void bt_unpinlatch (BtLatchSet *set)
619 {
620 #ifdef unix
621         __sync_fetch_and_add(&set->pin, -1);
622 #else
623         _InterlockedDecrement16 (&set->pin);
624 #endif
625 }
626
627 //      find existing latchset or inspire new one
628 //      return with latchset pinned
629
630 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
631 {
632 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
633 ushort slot, avail = 0, victim, idx;
634 BtLatchSet *set;
635
636         //  obtain read lock on hash table entry
637
638         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
639
640         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
641         {
642                 set = bt->mgr->latchsets + slot;
643                 if( page_no == set->page_no )
644                         break;
645         } while( slot = set->next );
646
647         if( slot ) {
648 #ifdef unix
649                 __sync_fetch_and_add(&set->pin, 1);
650 #else
651                 _InterlockedIncrement16 (&set->pin);
652 #endif
653         }
654
655     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
656
657         if( slot )
658                 return set;
659
660   //  try again, this time with write lock
661
662   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
663
664   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
665   {
666         set = bt->mgr->latchsets + slot;
667         if( page_no == set->page_no )
668                 break;
669         if( !set->pin && !avail )
670                 avail = slot;
671   } while( slot = set->next );
672
673   //  found our entry, or take over an unpinned one
674
675   if( slot || (slot = avail) ) {
676         set = bt->mgr->latchsets + slot;
677 #ifdef unix
678         __sync_fetch_and_add(&set->pin, 1);
679 #else
680         _InterlockedIncrement16 (&set->pin);
681 #endif
682         set->page_no = page_no;
683         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
684         return set;
685   }
686
687         //  see if there are any unused entries
688 #ifdef unix
689         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
690 #else
691         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
692 #endif
693
694         if( victim < bt->mgr->latchmgr->latchtotal ) {
695                 set = bt->mgr->latchsets + victim;
696 #ifdef unix
697                 __sync_fetch_and_add(&set->pin, 1);
698 #else
699                 _InterlockedIncrement16 (&set->pin);
700 #endif
701                 bt_latchlink (bt, hashidx, victim, page_no);
702                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
703                 return set;
704         }
705
706 #ifdef unix
707         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
708 #else
709         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
710 #endif
711   //  find and reuse previous lock entry
712
713   while( 1 ) {
714 #ifdef unix
715         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
716 #else
717         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
718 #endif
719         //      we don't use slot zero
720
721         if( victim %= bt->mgr->latchmgr->latchtotal )
722                 set = bt->mgr->latchsets + victim;
723         else
724                 continue;
725
726         //      take control of our slot
727         //      from other threads
728
729         if( set->pin || !bt_spinwritetry (set->busy) )
730                 continue;
731
732         idx = set->hash;
733
734         // try to get write lock on hash chain
735         //      skip entry if not obtained
736         //      or has outstanding locks
737
738         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
739                 bt_spinreleasewrite (set->busy);
740                 continue;
741         }
742
743         if( set->pin ) {
744                 bt_spinreleasewrite (set->busy);
745                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
746                 continue;
747         }
748
749         //  unlink our available victim from its hash chain
750
751         if( set->prev )
752                 bt->mgr->latchsets[set->prev].next = set->next;
753         else
754                 bt->mgr->latchmgr->table[idx].slot = set->next;
755
756         if( set->next )
757                 bt->mgr->latchsets[set->next].prev = set->prev;
758
759         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
760 #ifdef unix
761         __sync_fetch_and_add(&set->pin, 1);
762 #else
763         _InterlockedIncrement16 (&set->pin);
764 #endif
765         bt_latchlink (bt, hashidx, victim, page_no);
766         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
767         bt_spinreleasewrite (set->busy);
768         return set;
769   }
770 }
771
772 void bt_mgrclose (BtMgr *mgr)
773 {
774 BtPool *pool;
775 uint slot;
776
777         // release mapped pages
778         //      note that slot zero is never used
779
780         for( slot = 1; slot < mgr->poolmax; slot++ ) {
781                 pool = mgr->pool + slot;
782                 if( pool->slot )
783 #ifdef unix
784                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
785 #else
786                 {
787                         FlushViewOfFile(pool->map, 0);
788                         UnmapViewOfFile(pool->map);
789                         CloseHandle(pool->hmap);
790                 }
791 #endif
792         }
793
794 #ifdef unix
795         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
796         munmap (mgr->latchmgr, 2 * mgr->page_size);
797 #else
798         FlushViewOfFile(mgr->latchmgr, 0);
799         UnmapViewOfFile(mgr->latchmgr);
800         CloseHandle(mgr->halloc);
801 #endif
802 #ifdef unix
803         close (mgr->idx);
804         free (mgr->pool);
805         free (mgr->hash);
806         free ((void *)mgr->latch);
807         free (mgr);
808 #else
809         FlushFileBuffers(mgr->idx);
810         CloseHandle(mgr->idx);
811         GlobalFree (mgr->pool);
812         GlobalFree (mgr->hash);
813         GlobalFree ((void *)mgr->latch);
814         GlobalFree (mgr);
815 #endif
816 }
817
818 //      close and release memory
819
820 void bt_close (BtDb *bt)
821 {
822 #ifdef unix
823         if( bt->mem )
824                 free (bt->mem);
825 #else
826         if( bt->mem)
827                 VirtualFree (bt->mem, 0, MEM_RELEASE);
828 #endif
829         free (bt);
830 }
831
832 //  open/create new btree buffer manager
833
834 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
835 //              size of mapped page pool (e.g. 8192)
836
837 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
838 {
839 uint lvl, attr, cacheblk, last, slot, idx;
840 uint nlatchpage, latchhash;
841 unsigned char value[BtId];
842 BtLatchMgr *latchmgr;
843 off64_t size;
844 uint amt[1];
845 BtMgr* mgr;
846 BtKey key;
847 BtVal val;
848 int flag;
849
850 #ifndef unix
851 SYSTEM_INFO sysinfo[1];
852 #endif
853
854         // determine sanity of page size and buffer pool
855
856         if( bits > BT_maxbits )
857                 bits = BT_maxbits;
858         else if( bits < BT_minbits )
859                 bits = BT_minbits;
860
861         if( !poolmax )
862                 return NULL;    // must have buffer pool
863
864 #ifdef unix
865         mgr = calloc (1, sizeof(BtMgr));
866
867         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
868
869         if( mgr->idx == -1 )
870                 return free(mgr), NULL;
871         
872         cacheblk = 4096;        // minimum mmap segment size for unix
873
874 #else
875         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
876         attr = FILE_ATTRIBUTE_NORMAL;
877         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
878
879         if( mgr->idx == INVALID_HANDLE_VALUE )
880                 return GlobalFree(mgr), NULL;
881
882         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
883         GetSystemInfo(sysinfo);
884         cacheblk = sysinfo->dwAllocationGranularity;
885 #endif
886
887 #ifdef unix
888         latchmgr = malloc (BT_maxpage);
889         *amt = 0;
890
891         // read minimum page size to get root info
892
893         if( size = lseek (mgr->idx, 0L, 2) ) {
894                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
895                         bits = latchmgr->alloc->bits;
896                 else
897                         return free(mgr), free(latchmgr), NULL;
898         } else if( mode == BT_ro )
899                 return free(latchmgr), bt_mgrclose (mgr), NULL;
900 #else
901         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
902         size = GetFileSize(mgr->idx, amt);
903
904         if( size || *amt ) {
905                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
906                         return bt_mgrclose (mgr), NULL;
907                 bits = latchmgr->alloc->bits;
908         } else if( mode == BT_ro )
909                 return bt_mgrclose (mgr), NULL;
910 #endif
911
912         mgr->page_size = 1 << bits;
913         mgr->page_bits = bits;
914
915         mgr->poolmax = poolmax;
916         mgr->mode = mode;
917
918         if( cacheblk < mgr->page_size )
919                 cacheblk = mgr->page_size;
920
921         //  mask for partial memmaps
922
923         mgr->poolmask = (cacheblk >> bits) - 1;
924
925         //      see if requested size of pages per memmap is greater
926
927         if( (1 << segsize) > mgr->poolmask )
928                 mgr->poolmask = (1 << segsize) - 1;
929
930         mgr->seg_bits = 0;
931
932         while( (1 << mgr->seg_bits) <= mgr->poolmask )
933                 mgr->seg_bits++;
934
935         mgr->hashsize = hashsize;
936
937 #ifdef unix
938         mgr->pool = calloc (poolmax, sizeof(BtPool));
939         mgr->hash = calloc (hashsize, sizeof(ushort));
940         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
941 #else
942         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
943         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
944         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
945 #endif
946
947         if( size || *amt )
948                 goto mgrlatch;
949
950         // initialize an empty b-tree with latch page, root page, page of leaves
951         // and page(s) of latches
952
953         memset (latchmgr, 0, 1 << bits);
954         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
955         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
956         latchmgr->alloc->bits = mgr->page_bits;
957
958         latchmgr->nlatchpage = nlatchpage;
959         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
960
961         //  initialize latch manager
962
963         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
964
965         //      size of hash table = total number of latchsets
966
967         if( latchhash > latchmgr->latchtotal )
968                 latchhash = latchmgr->latchtotal;
969
970         latchmgr->latchhash = latchhash;
971
972 #ifdef unix
973         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
974                 return bt_mgrclose (mgr), NULL;
975 #else
976         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
977                 return bt_mgrclose (mgr), NULL;
978
979         if( *amt < mgr->page_size )
980                 return bt_mgrclose (mgr), NULL;
981 #endif
982
983         memset (latchmgr, 0, 1 << bits);
984         latchmgr->alloc->bits = mgr->page_bits;
985
986         for( lvl=MIN_lvl; lvl--; ) {
987                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
988                 key = keyptr(latchmgr->alloc, 1);
989                 key->len = 2;           // create stopper key
990                 key->key[0] = 0xff;
991                 key->key[1] = 0xff;
992
993                 bt_putid(value, MIN_lvl - lvl + 1);
994                 val = valptr(latchmgr->alloc, 1);
995                 val->len = lvl ? BtId : 0;
996                 memcpy (val->value, value, val->len);
997
998                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
999                 latchmgr->alloc->lvl = lvl;
1000                 latchmgr->alloc->cnt = 1;
1001                 latchmgr->alloc->act = 1;
1002 #ifdef unix
1003                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
1004                         return bt_mgrclose (mgr), NULL;
1005 #else
1006                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1007                         return bt_mgrclose (mgr), NULL;
1008
1009                 if( *amt < mgr->page_size )
1010                         return bt_mgrclose (mgr), NULL;
1011 #endif
1012         }
1013
1014         // clear out latch manager locks
1015         //      and rest of pages to round out segment
1016
1017         memset(latchmgr, 0, mgr->page_size);
1018         last = MIN_lvl + 1;
1019
1020         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1021 #ifdef unix
1022                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1023 #else
1024                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1025                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1026                         return bt_mgrclose (mgr), NULL;
1027                 if( *amt < mgr->page_size )
1028                         return bt_mgrclose (mgr), NULL;
1029 #endif
1030                 last++;
1031         }
1032
1033 mgrlatch:
1034 #ifdef unix
1035         // mlock the root page and the latchmgr page
1036
1037         flag = PROT_READ | PROT_WRITE;
1038         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1039         if( mgr->latchmgr == MAP_FAILED )
1040                 return bt_mgrclose (mgr), NULL;
1041         mlock (mgr->latchmgr, 2 * mgr->page_size);
1042
1043         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1044         if( mgr->latchsets == MAP_FAILED )
1045                 return bt_mgrclose (mgr), NULL;
1046         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1047 #else
1048         flag = PAGE_READWRITE;
1049         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1050         if( !mgr->halloc )
1051                 return bt_mgrclose (mgr), NULL;
1052
1053         flag = FILE_MAP_WRITE;
1054         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1055         if( !mgr->latchmgr )
1056                 return GetLastError(), bt_mgrclose (mgr), NULL;
1057
1058         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1059 #endif
1060
1061 #ifdef unix
1062         free (latchmgr);
1063 #else
1064         VirtualFree (latchmgr, 0, MEM_RELEASE);
1065 #endif
1066         return mgr;
1067 }
1068
1069 //      open BTree access method
1070 //      based on buffer manager
1071
1072 BtDb *bt_open (BtMgr *mgr)
1073 {
1074 BtDb *bt = malloc (sizeof(*bt));
1075
1076         memset (bt, 0, sizeof(*bt));
1077         bt->mgr = mgr;
1078 #ifdef unix
1079         bt->mem = malloc (2 *mgr->page_size);
1080 #else
1081         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1082 #endif
1083         bt->frame = (BtPage)bt->mem;
1084         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1085         return bt;
1086 }
1087
1088 //  compare two keys, returning > 0, = 0, or < 0
1089 //  as the comparison value
1090
1091 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1092 {
1093 uint len1 = key1->len;
1094 int ans;
1095
1096         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1097                 return ans;
1098
1099         if( len1 > len2 )
1100                 return 1;
1101         if( len1 < len2 )
1102                 return -1;
1103
1104         return 0;
1105 }
1106
1107 //      Buffer Pool mgr
1108
1109 // find segment in pool
1110 // must be called with hashslot idx locked
1111 //      return NULL if not there
1112 //      otherwise return node
1113
1114 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1115 {
1116 BtPool *pool;
1117 uint slot;
1118
1119         // compute start of hash chain in pool
1120
1121         if( slot = bt->mgr->hash[idx] ) 
1122                 pool = bt->mgr->pool + slot;
1123         else
1124                 return NULL;
1125
1126         page_no &= ~bt->mgr->poolmask;
1127
1128         while( pool->basepage != page_no )
1129           if( pool = pool->hashnext )
1130                 continue;
1131           else
1132                 return NULL;
1133
1134         return pool;
1135 }
1136
1137 // add segment to hash table
1138
1139 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1140 {
1141 BtPool *node;
1142 uint slot;
1143
1144         pool->hashprev = pool->hashnext = NULL;
1145         pool->basepage = page_no & ~bt->mgr->poolmask;
1146         pool->pin = CLOCK_bit + 1;
1147
1148         if( slot = bt->mgr->hash[idx] ) {
1149                 node = bt->mgr->pool + slot;
1150                 pool->hashnext = node;
1151                 node->hashprev = pool;
1152         }
1153
1154         bt->mgr->hash[idx] = pool->slot;
1155 }
1156
1157 //  map new buffer pool segment to virtual memory
1158
1159 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1160 {
1161 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1162 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1163 int flag;
1164
1165 #ifdef unix
1166         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1167         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1168         if( pool->map == MAP_FAILED )
1169                 return bt->err = BTERR_map;
1170
1171 #else
1172         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1173         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1174         if( !pool->hmap )
1175                 return bt->err = BTERR_map;
1176
1177         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1178         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1179         if( !pool->map )
1180                 return bt->err = BTERR_map;
1181 #endif
1182         return bt->err = 0;
1183 }
1184
1185 //      calculate page within pool
1186
1187 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1188 {
1189 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1190 BtPage page;
1191
1192         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1193 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1194         return page;
1195 }
1196
1197 //  release pool pin
1198
1199 void bt_unpinpool (BtPool *pool)
1200 {
1201 #ifdef unix
1202         __sync_fetch_and_add(&pool->pin, -1);
1203 #else
1204         _InterlockedDecrement16 (&pool->pin);
1205 #endif
1206 }
1207
1208 //      find or place requested page in segment-pool
1209 //      return pool table entry, incrementing pin
1210
1211 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1212 {
1213 uint slot, hashidx, idx, victim;
1214 BtPool *pool, *node, *next;
1215
1216         //      lock hash table chain
1217
1218         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1219         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1220
1221         //      look up in hash table
1222
1223         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1224 #ifdef unix
1225                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1226                 __sync_fetch_and_add(&pool->pin, 1);
1227 #else
1228                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1229                 _InterlockedIncrement16 (&pool->pin);
1230 #endif
1231                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1232                 return pool;
1233         }
1234
1235         // allocate a new pool node
1236         // and add to hash table
1237
1238 #ifdef unix
1239         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1240 #else
1241         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1242 #endif
1243
1244         if( ++slot < bt->mgr->poolmax ) {
1245                 pool = bt->mgr->pool + slot;
1246                 pool->slot = slot;
1247
1248                 if( bt_mapsegment(bt, pool, page_no) )
1249                         return NULL;
1250
1251                 bt_linkhash(bt, pool, page_no, hashidx);
1252                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1253                 return pool;
1254         }
1255
1256         // pool table is full
1257         //      find best pool entry to evict
1258
1259 #ifdef unix
1260         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1261 #else
1262         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1263 #endif
1264
1265         while( 1 ) {
1266 #ifdef unix
1267                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1268 #else
1269                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1270 #endif
1271                 victim %= bt->mgr->poolmax;
1272                 pool = bt->mgr->pool + victim;
1273                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1274
1275                 if( !victim )
1276                         continue;
1277
1278                 // try to get write lock
1279                 //      skip entry if not obtained
1280
1281                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1282                         continue;
1283
1284                 //      skip this entry if
1285                 //      page is pinned
1286                 //  or clock bit is set
1287
1288                 if( pool->pin ) {
1289 #ifdef unix
1290                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1291 #else
1292                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1293 #endif
1294                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1295                         continue;
1296                 }
1297
1298                 // unlink victim pool node from hash table
1299
1300                 if( node = pool->hashprev )
1301                         node->hashnext = pool->hashnext;
1302                 else if( node = pool->hashnext )
1303                         bt->mgr->hash[idx] = node->slot;
1304                 else
1305                         bt->mgr->hash[idx] = 0;
1306
1307                 if( node = pool->hashnext )
1308                         node->hashprev = pool->hashprev;
1309
1310                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1311
1312                 //      remove old file mapping
1313 #ifdef unix
1314                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1315 #else
1316 //              FlushViewOfFile(pool->map, 0);
1317                 UnmapViewOfFile(pool->map);
1318                 CloseHandle(pool->hmap);
1319 #endif
1320                 pool->map = NULL;
1321
1322                 //  create new pool mapping
1323                 //  and link into hash table
1324
1325                 if( bt_mapsegment(bt, pool, page_no) )
1326                         return NULL;
1327
1328                 bt_linkhash(bt, pool, page_no, hashidx);
1329                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1330                 return pool;
1331         }
1332 }
1333
1334 // place write, read, or parent lock on requested page_no.
1335
1336 void bt_lockpage(BtLock mode, BtLatchSet *set)
1337 {
1338         switch( mode ) {
1339         case BtLockRead:
1340                 ReadLock (set->readwr);
1341                 break;
1342         case BtLockWrite:
1343                 WriteLock (set->readwr);
1344                 break;
1345         case BtLockAccess:
1346                 ReadLock (set->access);
1347                 break;
1348         case BtLockDelete:
1349                 WriteLock (set->access);
1350                 break;
1351         case BtLockParent:
1352                 WriteLock (set->parent);
1353                 break;
1354         }
1355 }
1356
1357 // remove write, read, or parent lock on requested page
1358
1359 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1360 {
1361         switch( mode ) {
1362         case BtLockRead:
1363                 ReadRelease (set->readwr);
1364                 break;
1365         case BtLockWrite:
1366                 WriteRelease (set->readwr);
1367                 break;
1368         case BtLockAccess:
1369                 ReadRelease (set->access);
1370                 break;
1371         case BtLockDelete:
1372                 WriteRelease (set->access);
1373                 break;
1374         case BtLockParent:
1375                 WriteRelease (set->parent);
1376                 break;
1377         }
1378 }
1379
1380 //      allocate a new page
1381
1382 BTERR bt_newpage(BtDb *bt, BtPageSet *set)
1383 {
1384 int blk;
1385
1386         //      lock allocation page
1387
1388         bt_spinwritelock(bt->mgr->latchmgr->lock);
1389
1390         // use empty chain first
1391         // else allocate empty page
1392
1393         if( set->page_no = bt_getid(bt->mgr->latchmgr->chain) ) {
1394                 if( set->pool = bt_pinpool (bt, set->page_no) )
1395                         set->page = bt_page (bt, set->pool, set->page_no);
1396                 else
1397                         return bt->err = BTERR_struct;
1398
1399                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1400         } else {
1401                 set->page_no = bt_getid(bt->mgr->latchmgr->alloc->right);
1402                 bt_putid(bt->mgr->latchmgr->alloc->right, set->page_no+1);
1403 #ifdef unix
1404                 // if writing first page of pool block, set file length thru last page
1405
1406                 if( (set->page_no & bt->mgr->poolmask) == 0 )
1407                         ftruncate (bt->mgr->idx, (set->page_no + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1408 #endif
1409                 if( set->pool = bt_pinpool (bt, set->page_no) )
1410                         set->page = bt_page (bt, set->pool, set->page_no);
1411                 else
1412                         return bt->err;
1413         }
1414
1415         // unlock allocation latch
1416
1417         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1418         return 0;
1419 }
1420
1421 //  find slot in page for given key at a given level
1422
1423 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1424 {
1425 uint diff, higher = set->page->cnt, low = 1, slot;
1426 uint good = 0;
1427
1428         //        make stopper key an infinite fence value
1429
1430         if( bt_getid (set->page->right) )
1431                 higher++;
1432         else
1433                 good++;
1434
1435         //      low is the lowest candidate.
1436         //  loop ends when they meet
1437
1438         //  higher is already
1439         //      tested as .ge. the passed key.
1440
1441         while( diff = higher - low ) {
1442                 slot = low + ( diff >> 1 );
1443                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1444                         low = slot + 1;
1445                 else
1446                         higher = slot, good++;
1447         }
1448
1449         //      return zero if key is on right link page
1450
1451         return good ? higher : 0;
1452 }
1453
1454 //  find and load page at given level for given key
1455 //      leave page rd or wr locked as requested
1456
1457 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1458 {
1459 uid page_no = ROOT_page, prevpage = 0;
1460 uint drill = 0xff, slot;
1461 BtLatchSet *prevlatch;
1462 uint mode, prevmode;
1463 BtPool *prevpool;
1464
1465   //  start at root of btree and drill down
1466
1467   do {
1468         // determine lock mode of drill level
1469         mode = (drill == lvl) ? lock : BtLockRead; 
1470
1471         set->latch = bt_pinlatch (bt, page_no);
1472         set->page_no = page_no;
1473
1474         // pin page contents
1475
1476         if( set->pool = bt_pinpool (bt, page_no) )
1477                 set->page = bt_page (bt, set->pool, page_no);
1478         else
1479                 return 0;
1480
1481         // obtain access lock using lock chaining with Access mode
1482
1483         if( page_no > ROOT_page )
1484           bt_lockpage(BtLockAccess, set->latch);
1485
1486         //      release & unpin parent page
1487
1488         if( prevpage ) {
1489           bt_unlockpage(prevmode, prevlatch);
1490           bt_unpinlatch (prevlatch);
1491           bt_unpinpool (prevpool);
1492           prevpage = 0;
1493         }
1494
1495         // obtain read lock using lock chaining
1496
1497         bt_lockpage(mode, set->latch);
1498
1499         if( set->page->free )
1500                 return bt->err = BTERR_struct, 0;
1501
1502         if( page_no > ROOT_page )
1503           bt_unlockpage(BtLockAccess, set->latch);
1504
1505         // re-read and re-lock root after determining actual level of root
1506
1507         if( set->page->lvl != drill) {
1508                 if( set->page_no != ROOT_page )
1509                         return bt->err = BTERR_struct, 0;
1510                         
1511                 drill = set->page->lvl;
1512
1513                 if( lock != BtLockRead && drill == lvl ) {
1514                   bt_unlockpage(mode, set->latch);
1515                   bt_unpinlatch (set->latch);
1516                   bt_unpinpool (set->pool);
1517                   continue;
1518                 }
1519         }
1520
1521         prevpage = set->page_no;
1522         prevlatch = set->latch;
1523         prevpool = set->pool;
1524         prevmode = mode;
1525
1526         //  find key on page at this level
1527         //  and descend to requested level
1528
1529         if( set->page->kill )
1530           goto slideright;
1531
1532         if( slot = bt_findslot (set, key, len) ) {
1533           if( drill == lvl )
1534                 return slot;
1535
1536           while( slotptr(set->page, slot)->dead )
1537                 if( slot++ < set->page->cnt )
1538                         continue;
1539                 else
1540                         goto slideright;
1541
1542           page_no = bt_getid(valptr(set->page, slot)->value);
1543           drill--;
1544           continue;
1545         }
1546
1547         //  or slide right into next page
1548
1549 slideright:
1550         page_no = bt_getid(set->page->right);
1551
1552   } while( page_no );
1553
1554   // return error on end of right chain
1555
1556   bt->err = BTERR_struct;
1557   return 0;     // return error
1558 }
1559
1560 //      return page to free list
1561 //      page must be delete & write locked
1562
1563 void bt_freepage (BtDb *bt, BtPageSet *set)
1564 {
1565         //      lock allocation page
1566
1567         bt_spinwritelock (bt->mgr->latchmgr->lock);
1568
1569         //      store chain
1570         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1571         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1572         set->page->free = 1;
1573
1574         // unlock released page
1575
1576         bt_unlockpage (BtLockDelete, set->latch);
1577         bt_unlockpage (BtLockWrite, set->latch);
1578         bt_unpinlatch (set->latch);
1579         bt_unpinpool (set->pool);
1580
1581         // unlock allocation page
1582
1583         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1584 }
1585
1586 //      a fence key was deleted from a page
1587 //      push new fence value upwards
1588
1589 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1590 {
1591 unsigned char leftkey[256], rightkey[256];
1592 unsigned char value[BtId];
1593 BtKey ptr;
1594 uint idx;
1595
1596         // collapse empty slots beneath our slot
1597
1598         while( idx = set->page->cnt - 1 )
1599           if( slotptr(set->page, idx)->dead ) {
1600                 *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1601                 memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1602           } else
1603                 break;
1604
1605         //      remove the old fence value
1606
1607         ptr = keyptr(set->page, set->page->cnt);
1608         memcpy (rightkey, ptr, ptr->len + 1);
1609         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1610
1611         //  cache new fence value
1612
1613         ptr = keyptr(set->page, set->page->cnt);
1614         memcpy (leftkey, ptr, ptr->len + 1);
1615
1616         bt_lockpage (BtLockParent, set->latch);
1617         bt_unlockpage (BtLockWrite, set->latch);
1618
1619         //      insert new (now smaller) fence key
1620
1621         bt_putid (value, set->page_no);
1622
1623         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 1) )
1624           return bt->err;
1625
1626         //      now delete old fence key
1627
1628         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1629                 return bt->err;
1630
1631         bt_unlockpage (BtLockParent, set->latch);
1632         bt_unpinlatch(set->latch);
1633         bt_unpinpool (set->pool);
1634         return 0;
1635 }
1636
1637 //      root has a single child
1638 //      collapse a level from the tree
1639
1640 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1641 {
1642 BtPageSet child[1];
1643 uint idx;
1644
1645   // find the child entry and promote as new root contents
1646
1647   do {
1648         for( idx = 0; idx++ < root->page->cnt; )
1649           if( !slotptr(root->page, idx)->dead )
1650                 break;
1651
1652         child->page_no = bt_getid (valptr(root->page, idx)->value);
1653
1654         child->latch = bt_pinlatch (bt, child->page_no);
1655         bt_lockpage (BtLockDelete, child->latch);
1656         bt_lockpage (BtLockWrite, child->latch);
1657
1658         if( child->pool = bt_pinpool (bt, child->page_no) )
1659                 child->page = bt_page (bt, child->pool, child->page_no);
1660         else
1661                 return bt->err;
1662
1663         memcpy (root->page, child->page, bt->mgr->page_size);
1664         bt_freepage (bt, child);
1665
1666   } while( root->page->lvl > 1 && root->page->act == 1 );
1667
1668   bt_unlockpage (BtLockWrite, root->latch);
1669   bt_unpinlatch (root->latch);
1670   bt_unpinpool (root->pool);
1671   return 0;
1672 }
1673
1674 //      maintain reverse scan pointers by
1675 //      linking left pointer of far right node
1676
1677 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1678 {
1679 BtPageSet right2[1];
1680
1681         //      keep track of rightmost leaf page
1682
1683         if( !right_page_no ) {
1684           bt_putid (bt->mgr->latchmgr->alloc->left, left_page_no);
1685           return 0;
1686         }
1687
1688         //      link right page to left page
1689
1690         right2->latch = bt_pinlatch (bt, right_page_no);
1691         bt_lockpage (BtLockWrite, right2->latch);
1692
1693         if( right2->pool = bt_pinpool (bt, right_page_no) )
1694                 right2->page = bt_page (bt, right2->pool, right_page_no);
1695         else
1696                 return bt->err;
1697
1698         bt_putid(right2->page->left, left_page_no);
1699         bt_unlockpage (BtLockWrite, right2->latch);
1700         bt_unpinlatch (right2->latch);
1701         return 0;
1702 }
1703
1704 //  find and delete key on page by marking delete flag bit
1705 //  if page becomes empty, delete it from the btree
1706
1707 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1708 {
1709 unsigned char lowerfence[256], higherfence[256];
1710 BtPageSet set[1], right[1], right2[1];
1711 unsigned char value[BtId];
1712 uint slot, idx, found;
1713 BtKey ptr, tst;
1714 BtVal val;
1715
1716         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1717                 ptr = keyptr(set->page, slot);
1718         else
1719                 return bt->err;
1720
1721         // if librarian slot, advance to real slot
1722
1723         if( slotptr(set->page, slot)->type == Librarian )
1724                 ptr = keyptr(set->page, ++slot);
1725
1726         // if key is found delete it, otherwise ignore request
1727
1728         if( found = !keycmp (ptr, key, len) )
1729           if( found = slotptr(set->page, slot)->dead == 0 ) {
1730                 val = valptr(set->page,slot);
1731                 slotptr(set->page, slot)->dead = 1;
1732                 set->page->garbage += ptr->len + val->len + 2;
1733                 set->page->act--;
1734           }
1735
1736         //      did we delete a fence key in an upper level?
1737
1738         if( found && lvl && set->page->act && slot == set->page->cnt )
1739           if( bt_fixfence (bt, set, lvl) )
1740                 return bt->err;
1741           else
1742                 return bt->found = found, 0;
1743
1744         //      do we need to collapse root?
1745
1746         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1747           if( bt_collapseroot (bt, set) )
1748                 return bt->err;
1749           else
1750                 return bt->found = found, 0;
1751
1752         //      return if page is not empty
1753
1754         if( set->page->act ) {
1755                 bt_unlockpage(BtLockWrite, set->latch);
1756                 bt_unpinlatch (set->latch);
1757                 bt_unpinpool (set->pool);
1758                 return bt->found = found, 0;
1759         }
1760
1761         //      cache copy of fence key
1762         //      to post in parent
1763
1764         ptr = keyptr(set->page, set->page->cnt);
1765         memcpy (lowerfence, ptr, ptr->len + 1);
1766
1767         //      obtain lock on right page
1768
1769         right->page_no = bt_getid(set->page->right);
1770         right->latch = bt_pinlatch (bt, right->page_no);
1771         bt_lockpage (BtLockWrite, right->latch);
1772
1773         // pin page contents
1774
1775         if( right->pool = bt_pinpool (bt, right->page_no) )
1776                 right->page = bt_page (bt, right->pool, right->page_no);
1777         else
1778                 return bt->err;
1779
1780         if( right->page->kill )
1781                 return bt->err = BTERR_struct;
1782
1783         // transfer left link
1784
1785         memcpy (right->page->left, set->page->left, BtId);
1786
1787         // pull contents of right peer into our empty page
1788
1789         memcpy (set->page, right->page, bt->mgr->page_size);
1790
1791         // update left link
1792
1793         if( !lvl )
1794           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1795                 return bt->err;
1796
1797         // cache copy of key to update
1798
1799         ptr = keyptr(right->page, right->page->cnt);
1800         memcpy (higherfence, ptr, ptr->len + 1);
1801
1802         // mark right page deleted and point it to left page
1803         //      until we can post parent updates
1804
1805         bt_putid (right->page->right, set->page_no);
1806         right->page->kill = 1;
1807
1808         bt_lockpage (BtLockParent, right->latch);
1809         bt_unlockpage (BtLockWrite, right->latch);
1810
1811         bt_lockpage (BtLockParent, set->latch);
1812         bt_unlockpage (BtLockWrite, set->latch);
1813
1814         // redirect higher key directly to our new node contents
1815
1816         bt_putid (value, set->page_no);
1817
1818         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, 1) )
1819           return bt->err;
1820
1821         //      delete old lower key to our node
1822
1823         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1824           return bt->err;
1825
1826         //      obtain delete and write locks to right node
1827
1828         bt_unlockpage (BtLockParent, right->latch);
1829         bt_lockpage (BtLockDelete, right->latch);
1830         bt_lockpage (BtLockWrite, right->latch);
1831         bt_freepage (bt, right);
1832
1833         bt_unlockpage (BtLockParent, set->latch);
1834         bt_unpinlatch (set->latch);
1835         bt_unpinpool (set->pool);
1836         bt->found = found;
1837         return 0;
1838 }
1839
1840 BtKey bt_foundkey (BtDb *bt)
1841 {
1842         return (BtKey)(bt->key);
1843 }
1844
1845 //      advance to next slot
1846
1847 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1848 {
1849 BtLatchSet *prevlatch;
1850 BtPool *prevpool;
1851 uid page_no;
1852
1853         if( slot < set->page->cnt )
1854                 return slot + 1;
1855
1856         prevlatch = set->latch;
1857         prevpool = set->pool;
1858
1859         if( page_no = bt_getid(set->page->right) )
1860                 set->latch = bt_pinlatch (bt, page_no);
1861         else
1862                 return bt->err = BTERR_struct, 0;
1863
1864         // pin page contents
1865
1866         if( set->pool = bt_pinpool (bt, page_no) )
1867                 set->page = bt_page (bt, set->pool, page_no);
1868         else
1869                 return 0;
1870
1871         // obtain access lock using lock chaining with Access mode
1872
1873         bt_lockpage(BtLockAccess, set->latch);
1874
1875         bt_unlockpage(BtLockRead, prevlatch);
1876         bt_unpinlatch (prevlatch);
1877         bt_unpinpool (prevpool);
1878
1879         bt_lockpage(BtLockRead, set->latch);
1880         bt_unlockpage(BtLockAccess, set->latch);
1881
1882         set->page_no = page_no;
1883         return 1;
1884 }
1885
1886 //      find unique key or first duplicate key in
1887 //      leaf level and return number of value bytes
1888 //      or (-1) if not found.  Setup key for bt_foundkey
1889
1890 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1891 {
1892 BtPageSet set[1];
1893 uint len, slot;
1894 int ret = -1;
1895 BtKey ptr;
1896 BtVal val;
1897
1898   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1899    do {
1900         ptr = keyptr(set->page, slot);
1901
1902         //      skip librarian slot place holder
1903
1904         if( slotptr(set->page, slot)->type == Librarian )
1905                 ptr = keyptr(set->page, ++slot);
1906
1907         //      return actual key found
1908
1909         memcpy (bt->key, ptr, ptr->len + 1);
1910         len = ptr->len;
1911
1912         if( slotptr(set->page, slot)->type == Duplicate )
1913                 len -= BtId;
1914
1915         // if key exists, return >= 0 value bytes copied
1916         //      otherwise return (-1)
1917
1918         if( slotptr(set->page, slot)->dead )
1919                 continue;
1920
1921         if( keylen == len )
1922           if( !memcmp (ptr->key, key, len) ) {
1923                 val = valptr (set->page,slot);
1924                 if( valmax > val->len )
1925                         valmax = val->len;
1926                 memcpy (value, val->value, valmax);
1927                 ret = valmax;
1928           }
1929
1930         break;
1931
1932    } while( slot = bt_findnext (bt, set, slot) );
1933
1934   bt_unlockpage (BtLockRead, set->latch);
1935   bt_unpinlatch (set->latch);
1936   bt_unpinpool (set->pool);
1937   return ret;
1938 }
1939
1940 //      check page for space available,
1941 //      clean if necessary and return
1942 //      0 - page needs splitting
1943 //      >0  new slot value
1944
1945 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1946 {
1947 uint nxt = bt->mgr->page_size;
1948 uint cnt = 0, idx = 0;
1949 uint max = page->cnt;
1950 uint newslot = max;
1951 BtKey key;
1952 BtVal val;
1953
1954         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1955                 return slot;
1956
1957         //      skip cleanup and proceed to split
1958         //      if there's not enough garbage
1959         //      to bother with.
1960
1961         if( page->garbage < nxt / 5 )
1962                 return 0;
1963
1964         memcpy (bt->frame, page, bt->mgr->page_size);
1965
1966         // skip page info and set rest of page to zero
1967
1968         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1969         page->garbage = 0;
1970         page->act = 0;
1971
1972         // clean up page first by
1973         // removing deleted keys
1974
1975         while( cnt++ < max ) {
1976                 if( cnt == slot )
1977                         newslot = idx + 2;
1978                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1979                         continue;
1980
1981                 // copy the value across
1982
1983                 val = valptr(bt->frame, cnt);
1984                 nxt -= val->len + 1;
1985                 ((unsigned char *)page)[nxt] = val->len;
1986                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1987
1988                 // copy the key across
1989
1990                 key = keyptr(bt->frame, cnt);
1991                 nxt -= key->len + 1;
1992                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1993
1994                 // make a librarian slot
1995
1996                 if( idx ) {
1997                         slotptr(page, ++idx)->off = nxt;
1998                         slotptr(page, idx)->type = Librarian;
1999                         slotptr(page, idx)->dead = 1;
2000                 }
2001
2002                 // set up the slot
2003
2004                 slotptr(page, ++idx)->off = nxt;
2005                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
2006
2007                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
2008                         page->act++;
2009         }
2010
2011         page->min = nxt;
2012         page->cnt = idx;
2013
2014         //      see if page has enough space now, or does it need splitting?
2015
2016         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
2017                 return newslot;
2018
2019         return 0;
2020 }
2021
2022 // split the root and raise the height of the btree
2023
2024 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, BtPageSet *right)
2025 {
2026 uint nxt = bt->mgr->page_size;
2027 unsigned char value[BtId];
2028 BtPageSet left[1];
2029
2030         //  Obtain an empty page to use, and copy the current
2031         //  root contents into it, e.g. lower keys
2032
2033         if( bt_newpage(bt, left) )
2034                 return bt->err;
2035
2036         memcpy(left->page, root->page, bt->mgr->page_size);
2037         bt_unpinpool (left->pool);
2038
2039         // set left from higher to lower page
2040
2041         bt_putid (right->page->left, left->page_no);
2042         bt_unpinpool (right->pool);
2043
2044         // preserve the page info at the bottom
2045         // of higher keys and set rest to zero
2046
2047         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2048
2049         // insert lower keys page fence key on newroot page as first key
2050
2051         nxt -= BtId + 1;
2052         bt_putid (value, left->page_no);
2053         ((unsigned char *)root->page)[nxt] = BtId;
2054         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
2055
2056         nxt -= *leftkey + 1;
2057         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2058         slotptr(root->page, 1)->off = nxt;
2059         
2060         // insert stopper key on newroot page
2061         // and increase the root height
2062
2063         nxt -= 3 + BtId + 1;
2064         ((unsigned char *)root->page)[nxt] = 2;
2065         ((unsigned char *)root->page)[nxt+1] = 0xff;
2066         ((unsigned char *)root->page)[nxt+2] = 0xff;
2067
2068         bt_putid (value, right->page_no);
2069         ((unsigned char *)root->page)[nxt+3] = BtId;
2070         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
2071         slotptr(root->page, 2)->off = nxt;
2072
2073         bt_putid(root->page->right, 0);
2074         root->page->min = nxt;          // reset lowest used offset and key count
2075         root->page->cnt = 2;
2076         root->page->act = 2;
2077         root->page->lvl++;
2078
2079         // release and unpin root
2080
2081         bt_unlockpage(BtLockWrite, root->latch);
2082         bt_unpinlatch (root->latch);
2083         bt_unpinpool (root->pool);
2084         return 0;
2085 }
2086
2087 //  split already locked full node
2088 //      return unlocked.
2089
2090 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2091 {
2092 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2093 unsigned char fencekey[256], rightkey[256];
2094 unsigned char value[BtId];
2095 uint lvl = set->page->lvl;
2096 BtPageSet right[1];
2097 uid right2;
2098 uint prev;
2099 BtKey key;
2100 BtVal val;
2101
2102         //  split higher half of keys to bt->frame
2103
2104         memset (bt->frame, 0, bt->mgr->page_size);
2105         max = set->page->cnt;
2106         cnt = max / 2;
2107         idx = 0;
2108
2109         while( cnt++ < max ) {
2110                 if( slotptr(set->page, cnt)->dead && cnt < max )
2111                         continue;
2112                 val = valptr(set->page, cnt);
2113                 nxt -= val->len + 1;
2114                 ((unsigned char *)bt->frame)[nxt] = val->len;
2115                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
2116
2117                 key = keyptr(set->page, cnt);
2118                 nxt -= key->len + 1;
2119                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2120
2121                 //      add librarian slot
2122
2123                 if( idx ) {
2124                         slotptr(bt->frame, ++idx)->off = nxt;
2125                         slotptr(bt->frame, idx)->type = Librarian;
2126                         slotptr(bt->frame, idx)->dead = 1;
2127                 }
2128
2129                 //  add actual slot
2130
2131                 slotptr(bt->frame, ++idx)->off = nxt;
2132                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2133
2134                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2135                         bt->frame->act++;
2136         }
2137
2138         // remember existing fence key for new page to the right
2139
2140         memcpy (rightkey, key, key->len + 1);
2141
2142         bt->frame->bits = bt->mgr->page_bits;
2143         bt->frame->min = nxt;
2144         bt->frame->cnt = idx;
2145         bt->frame->lvl = lvl;
2146
2147         // link right node
2148
2149         if( set->page_no > ROOT_page ) {
2150                 bt_putid (bt->frame->right, bt_getid (set->page->right));
2151                 bt_putid(bt->frame->left, set->page_no);
2152         }
2153
2154         //      get new free page and write higher keys to it.
2155
2156         if( bt_newpage(bt, right) )
2157                 return bt->err;
2158
2159         // link left node
2160
2161         if( set->page_no > ROOT_page && !lvl )
2162           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
2163                 return bt->err;
2164
2165         memcpy (right->page, bt->frame, bt->mgr->page_size);
2166         bt_unpinpool (right->pool);
2167
2168         //      update lower keys to continue in old page
2169
2170         memcpy (bt->frame, set->page, bt->mgr->page_size);
2171         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2172         nxt = bt->mgr->page_size;
2173         set->page->garbage = 0;
2174         set->page->act = 0;
2175         max /= 2;
2176         cnt = 0;
2177         idx = 0;
2178
2179         if( slotptr(bt->frame, max)->type == Librarian )
2180                 max--;
2181
2182         //  assemble page of smaller keys
2183
2184         while( cnt++ < max ) {
2185                 if( slotptr(bt->frame, cnt)->dead )
2186                         continue;
2187                 val = valptr(bt->frame, cnt);
2188                 nxt -= val->len + 1;
2189                 ((unsigned char *)set->page)[nxt] = val->len;
2190                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2191
2192                 key = keyptr(bt->frame, cnt);
2193                 nxt -= key->len + 1;
2194                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2195
2196                 //      add librarian slot
2197
2198                 if( idx ) {
2199                         slotptr(set->page, ++idx)->off = nxt;
2200                         slotptr(set->page, idx)->type = Librarian;
2201                         slotptr(set->page, idx)->dead = 1;
2202                 }
2203
2204                 //      add actual slot
2205
2206                 slotptr(set->page, ++idx)->off = nxt;
2207                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2208                 set->page->act++;
2209         }
2210
2211         // remember fence key for smaller page
2212
2213         memcpy(fencekey, key, key->len + 1);
2214
2215         bt_putid(set->page->right, right->page_no);
2216         set->page->min = nxt;
2217         set->page->cnt = idx;
2218
2219         // if current page is the root page, split it
2220
2221         if( set->page_no == ROOT_page )
2222                 return bt_splitroot (bt, set, fencekey, right);
2223
2224         // insert new fences in their parent pages
2225
2226         right->latch = bt_pinlatch (bt, right->page_no);
2227         bt_lockpage (BtLockParent, right->latch);
2228
2229         bt_lockpage (BtLockParent, set->latch);
2230         bt_unlockpage (BtLockWrite, set->latch);
2231
2232         // insert new fence for reformulated left block of smaller keys
2233
2234         bt_putid (value, set->page_no);
2235
2236         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2237                 return bt->err;
2238
2239         // switch fence for right block of larger keys to new right page
2240
2241         bt_putid (value, right->page_no);
2242
2243         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2244                 return bt->err;
2245
2246         bt_unlockpage (BtLockParent, set->latch);
2247         bt_unpinlatch (set->latch);
2248         bt_unpinpool (set->pool);
2249
2250         bt_unlockpage (BtLockParent, right->latch);
2251         bt_unpinlatch (right->latch);
2252         return 0;
2253 }
2254
2255 //      install new key and value onto page
2256 //      page must already be checked for
2257 //      adequate space
2258
2259 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2260 {
2261 uint idx, librarian;
2262 BtSlot *node;
2263
2264         //      if found slot > desired slot and previous slot
2265         //      is a librarian slot, use it
2266
2267         if( slot > 1 )
2268           if( slotptr(set->page, slot-1)->type == Librarian )
2269                 slot--;
2270
2271         // copy value onto page
2272
2273         set->page->min -= vallen + 1; // reset lowest used offset
2274         ((unsigned char *)set->page)[set->page->min] = vallen;
2275         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2276
2277         // copy key onto page
2278
2279         set->page->min -= keylen + 1; // reset lowest used offset
2280         ((unsigned char *)set->page)[set->page->min] = keylen;
2281         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2282
2283         //      find first empty slot
2284
2285         for( idx = slot; idx < set->page->cnt; idx++ )
2286           if( slotptr(set->page, idx)->dead )
2287                 break;
2288
2289         // now insert key into array before slot
2290
2291         if( idx == set->page->cnt )
2292                 idx += 2, set->page->cnt += 2, librarian = 2;
2293         else
2294                 librarian = 1;
2295
2296         set->page->act++;
2297
2298         while( idx > slot + librarian - 1 )
2299                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2300
2301         //      add librarian slot
2302
2303         if( librarian > 1 ) {
2304                 node = slotptr(set->page, slot++);
2305                 node->off = set->page->min;
2306                 node->type = Librarian;
2307                 node->dead = 1;
2308         }
2309
2310         //      fill in new slot
2311
2312         node = slotptr(set->page, slot);
2313         node->off = set->page->min;
2314         node->type = type;
2315         node->dead = 0;
2316
2317         bt_unlockpage (BtLockWrite, set->latch);
2318         bt_unpinlatch (set->latch);
2319         bt_unpinpool (set->pool);
2320         return 0;
2321 }
2322
2323 //  Insert new key into the btree at given level.
2324 //      either add a new key or update/add an existing one
2325
2326 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2327 {
2328 unsigned char newkey[256];
2329 uint slot, idx, len;
2330 BtPageSet set[1];
2331 uid sequence;
2332 BtKey ptr;
2333 BtVal val;
2334 uint type;
2335
2336   // set up the key we're working on
2337
2338   memcpy (newkey + 1, key, keylen);
2339   newkey[0] = keylen;
2340
2341   // is this a non-unique index value?
2342
2343   if( unique )
2344         type = Unique;
2345   else {
2346         type = Duplicate;
2347         sequence = bt_newdup (bt);
2348         bt_putid (newkey + *newkey + 1, sequence);
2349         *newkey += BtId;
2350   }
2351
2352   while( 1 ) { // find the page and slot for the current key
2353         if( slot = bt_loadpage (bt, set, newkey + 1, *newkey, lvl, BtLockWrite) )
2354                 ptr = keyptr(set->page, slot);
2355         else {
2356                 if( !bt->err )
2357                         bt->err = BTERR_ovflw;
2358                 return bt->err;
2359         }
2360
2361         // if librarian slot == found slot, advance to real slot
2362
2363         if( slotptr(set->page, slot)->type == Librarian )
2364           if( !keycmp (ptr, key, keylen) )
2365                 ptr = keyptr(set->page, ++slot);
2366
2367         len = ptr->len;
2368
2369         if( slotptr(set->page, slot)->type == Duplicate )
2370                 len -= BtId;
2371
2372         //  if inserting a duplicate key or unique key
2373         //      check for adequate space on the page
2374         //      and insert the new key before slot.
2375
2376         if( unique && (len != *newkey || memcmp (ptr->key, newkey+1, *newkey)) || !unique ) {
2377           if( !(slot = bt_cleanpage (bt, set->page, *newkey, slot, vallen)) )
2378                 if( bt_splitpage (bt, set) )
2379                   return bt->err;
2380                 else
2381                   continue;
2382
2383           return bt_insertslot (bt, set, slot, newkey + 1, *newkey, value, vallen, type);
2384         }
2385
2386         // if key already exists, update value and return
2387
2388         if( val = valptr(set->page, slot), val->len >= vallen ) {
2389                 if( slotptr(set->page, slot)->dead )
2390                         set->page->act++;
2391                 set->page->garbage += val->len - vallen;
2392                 slotptr(set->page, slot)->dead = 0;
2393                 val->len = vallen;
2394                 memcpy (val->value, value, vallen);
2395                 bt_unlockpage(BtLockWrite, set->latch);
2396                 bt_unpinlatch (set->latch);
2397                 bt_unpinpool (set->pool);
2398                 return 0;
2399         }
2400
2401         //      new update value doesn't fit in existing value area
2402
2403         if( !slotptr(set->page, slot)->dead )
2404                 set->page->garbage += val->len + ptr->len + 2;
2405         else {
2406                 slotptr(set->page, slot)->dead = 0;
2407                 set->page->act++;
2408         }
2409
2410         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2411           if( bt_splitpage (bt, set) )
2412                 return bt->err;
2413           else
2414                 continue;
2415
2416         set->page->min -= vallen + 1;
2417         ((unsigned char *)set->page)[set->page->min] = vallen;
2418         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2419
2420         set->page->min -= keylen + 1;
2421         ((unsigned char *)set->page)[set->page->min] = keylen;
2422         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2423         
2424         slotptr(set->page, slot)->off = set->page->min;
2425         bt_unlockpage(BtLockWrite, set->latch);
2426         bt_unpinlatch (set->latch);
2427         bt_unpinpool (set->pool);
2428         return 0;
2429   }
2430   return 0;
2431 }
2432
2433 //      set cursor to highest slot on highest page
2434
2435 uint bt_lastkey (BtDb *bt)
2436 {
2437 uid page_no = bt_getid (bt->mgr->latchmgr->alloc->left);
2438 BtPageSet set[1];
2439 uint slot;
2440
2441         if( set->pool = bt_pinpool (bt, page_no) )
2442                 set->page = bt_page (bt, set->pool, page_no);
2443         else
2444                 return 0;
2445
2446         set->latch = bt_pinlatch (bt, page_no);
2447     bt_lockpage(BtLockRead, set->latch);
2448
2449         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2450         slot = set->page->cnt;
2451
2452     bt_unlockpage(BtLockRead, set->latch);
2453         bt_unpinlatch (set->latch);
2454         bt_unpinpool (set->pool);
2455
2456         return slot;
2457 }
2458
2459 //      return previous slot on cursor page
2460
2461 uint bt_prevkey (BtDb *bt, uint slot)
2462 {
2463 BtPageSet set[1];
2464 uid left;
2465
2466         if( --slot )
2467                 return slot;
2468
2469         if( left = bt_getid(bt->cursor->left) )
2470                 bt->cursor_page = left;
2471         else
2472                 return 0;
2473
2474         if( set->pool = bt_pinpool (bt, left) )
2475                 set->page = bt_page (bt, set->pool, left);
2476         else
2477                 return 0;
2478
2479         set->latch = bt_pinlatch (bt, left);
2480     bt_lockpage(BtLockRead, set->latch);
2481
2482         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2483
2484         bt_unlockpage(BtLockRead, set->latch);
2485         bt_unpinlatch (set->latch);
2486         bt_unpinpool (set->pool);
2487         return bt->cursor->cnt;
2488 }
2489
2490 //  return next slot on cursor page
2491 //  or slide cursor right into next page
2492
2493 uint bt_nextkey (BtDb *bt, uint slot)
2494 {
2495 BtPageSet set[1];
2496 uid right;
2497
2498   do {
2499         right = bt_getid(bt->cursor->right);
2500
2501         while( slot++ < bt->cursor->cnt )
2502           if( slotptr(bt->cursor,slot)->dead )
2503                 continue;
2504           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2505                 return slot;
2506           else
2507                 break;
2508
2509         if( !right )
2510                 break;
2511
2512         bt->cursor_page = right;
2513
2514         if( set->pool = bt_pinpool (bt, right) )
2515                 set->page = bt_page (bt, set->pool, right);
2516         else
2517                 return 0;
2518
2519         set->latch = bt_pinlatch (bt, right);
2520     bt_lockpage(BtLockRead, set->latch);
2521
2522         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2523
2524         bt_unlockpage(BtLockRead, set->latch);
2525         bt_unpinlatch (set->latch);
2526         bt_unpinpool (set->pool);
2527         slot = 0;
2528
2529   } while( 1 );
2530
2531   return bt->err = 0;
2532 }
2533
2534 //  cache page of keys into cursor and return starting slot for given key
2535
2536 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2537 {
2538 BtPageSet set[1];
2539 uint slot;
2540
2541         // cache page for retrieval
2542
2543         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2544           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2545         else
2546           return 0;
2547
2548         bt->cursor_page = set->page_no;
2549
2550         bt_unlockpage(BtLockRead, set->latch);
2551         bt_unpinlatch (set->latch);
2552         bt_unpinpool (set->pool);
2553         return slot;
2554 }
2555
2556 BtKey bt_key(BtDb *bt, uint slot)
2557 {
2558         return keyptr(bt->cursor, slot);
2559 }
2560
2561 BtVal bt_val(BtDb *bt, uint slot)
2562 {
2563         return valptr(bt->cursor,slot);
2564 }
2565
2566 #ifdef STANDALONE
2567
2568 #ifndef unix
2569 double getCpuTime(int type)
2570 {
2571 FILETIME crtime[1];
2572 FILETIME xittime[1];
2573 FILETIME systime[1];
2574 FILETIME usrtime[1];
2575 SYSTEMTIME timeconv[1];
2576 double ans = 0;
2577
2578         memset (timeconv, 0, sizeof(SYSTEMTIME));
2579
2580         switch( type ) {
2581         case 0:
2582                 GetSystemTimeAsFileTime (xittime);
2583                 FileTimeToSystemTime (xittime, timeconv);
2584                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2585                 break;
2586         case 1:
2587                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2588                 FileTimeToSystemTime (usrtime, timeconv);
2589                 break;
2590         case 2:
2591                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2592                 FileTimeToSystemTime (systime, timeconv);
2593                 break;
2594         }
2595
2596         ans += (double)timeconv->wHour * 3600;
2597         ans += (double)timeconv->wMinute * 60;
2598         ans += (double)timeconv->wSecond;
2599         ans += (double)timeconv->wMilliseconds / 1000;
2600         return ans;
2601 }
2602 #else
2603 #include <time.h>
2604 #include <sys/resource.h>
2605
2606 double getCpuTime(int type)
2607 {
2608 struct rusage used[1];
2609 struct timeval tv[1];
2610
2611         switch( type ) {
2612         case 0:
2613                 gettimeofday(tv, NULL);
2614                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2615
2616         case 1:
2617                 getrusage(RUSAGE_SELF, used);
2618                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2619
2620         case 2:
2621                 getrusage(RUSAGE_SELF, used);
2622                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2623         }
2624
2625         return 0;
2626 }
2627 #endif
2628
2629 uint bt_latchaudit (BtDb *bt)
2630 {
2631 ushort idx, hashidx;
2632 uid next, page_no;
2633 BtLatchSet *latch;
2634 uint cnt = 0;
2635 BtKey ptr;
2636
2637 #ifdef unix
2638         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2639 #endif
2640         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2641                 fprintf(stderr, "Alloc page locked\n");
2642         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2643
2644         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2645                 latch = bt->mgr->latchsets + idx;
2646                 if( *latch->readwr->rin & MASK )
2647                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2648                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2649
2650                 if( *latch->access->rin & MASK )
2651                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2652                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2653
2654                 if( *latch->parent->rin & MASK )
2655                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2656                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2657
2658                 if( latch->pin ) {
2659                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2660                         latch->pin = 0;
2661                 }
2662         }
2663
2664         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2665           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2666                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2667
2668           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2669
2670           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2671                 latch = bt->mgr->latchsets + idx;
2672                 if( *(ushort *)latch->busy )
2673                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2674                 *(ushort *)latch->busy = 0;
2675                 if( latch->pin )
2676                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2677           } while( idx = latch->next );
2678         }
2679
2680         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2681         page_no = LEAF_page;
2682
2683         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2684         off64_t off = page_no << bt->mgr->page_bits;
2685 #ifdef unix
2686                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2687 #else
2688                 DWORD amt[1];
2689
2690                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2691
2692                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2693                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2694
2695                   if( *amt <  bt->mgr->page_size )
2696                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2697 #endif
2698                 if( !bt->frame->free ) {
2699                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2700                   ptr = keyptr(bt->frame, idx+1);
2701                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2702                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2703                  }
2704                  if( !bt->frame->lvl )
2705                         cnt += bt->frame->act;
2706                 }
2707                 if( page_no > LEAF_page )
2708                         next = page_no + 1;
2709                 page_no = next;
2710         }
2711         return cnt - 1;
2712 }
2713
2714 typedef struct {
2715         char idx;
2716         char *type;
2717         char *infile;
2718         BtMgr *mgr;
2719         int num;
2720 } ThreadArg;
2721
2722 //  standalone program to index file of keys
2723 //  then list them onto std-out
2724
2725 #ifdef unix
2726 void *index_file (void *arg)
2727 #else
2728 uint __stdcall index_file (void *arg)
2729 #endif
2730 {
2731 int line = 0, found = 0, cnt = 0;
2732 uid next, page_no = LEAF_page;  // start on first page of leaves
2733 unsigned char key[256];
2734 ThreadArg *args = arg;
2735 int ch, len = 0, slot;
2736 BtPageSet set[1];
2737 BtKey ptr;
2738 BtDb *bt;
2739 FILE *in;
2740
2741         bt = bt_open (args->mgr);
2742
2743         switch(args->type[0] | 0x20)
2744         {
2745         case 'a':
2746                 fprintf(stderr, "started latch mgr audit\n");
2747                 cnt = bt_latchaudit (bt);
2748                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2749                 break;
2750
2751         case 'p':
2752                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2753                 if( in = fopen (args->infile, "rb") )
2754                   while( ch = getc(in), ch != EOF )
2755                         if( ch == '\n' )
2756                         {
2757                           line++;
2758
2759                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
2760                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2761                           len = 0;
2762                         }
2763                         else if( len < 255 )
2764                                 key[len++] = ch;
2765                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2766                 break;
2767
2768         case 'w':
2769                 fprintf(stderr, "started indexing for %s\n", args->infile);
2770                 if( in = fopen (args->infile, "rb") )
2771                   while( ch = getc(in), ch != EOF )
2772                         if( ch == '\n' )
2773                         {
2774                           line++;
2775
2776                           if( args->num == 1 )
2777                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2778
2779                           else if( args->num )
2780                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2781
2782                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
2783                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2784                           len = 0;
2785                         }
2786                         else if( len < 255 )
2787                                 key[len++] = ch;
2788                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2789                 break;
2790
2791         case 'd':
2792                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2793                 if( in = fopen (args->infile, "rb") )
2794                   while( ch = getc(in), ch != EOF )
2795                         if( ch == '\n' )
2796                         {
2797                           line++;
2798                           if( args->num == 1 )
2799                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2800
2801                           else if( args->num )
2802                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2803
2804                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2805                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2806                           ptr = (BtKey)(bt->key);
2807                           found++;
2808
2809                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2810                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2811                           len = 0;
2812                         }
2813                         else if( len < 255 )
2814                                 key[len++] = ch;
2815                 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2816                 break;
2817
2818         case 'f':
2819                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2820                 if( in = fopen (args->infile, "rb") )
2821                   while( ch = getc(in), ch != EOF )
2822                         if( ch == '\n' )
2823                         {
2824                           line++;
2825                           if( args->num == 1 )
2826                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2827
2828                           else if( args->num )
2829                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2830
2831                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2832                                 found++;
2833                           else if( bt->err )
2834                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2835                           len = 0;
2836                         }
2837                         else if( len < 255 )
2838                                 key[len++] = ch;
2839                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2840                 break;
2841
2842         case 's':
2843                 fprintf(stderr, "started scanning\n");
2844                 do {
2845                         if( set->pool = bt_pinpool (bt, page_no) )
2846                                 set->page = bt_page (bt, set->pool, page_no);
2847                         else
2848                                 break;
2849                         set->latch = bt_pinlatch (bt, page_no);
2850                         bt_lockpage (BtLockRead, set->latch);
2851                         next = bt_getid (set->page->right);
2852
2853                         for( slot = 0; slot++ < set->page->cnt; )
2854                          if( next || slot < set->page->cnt )
2855                           if( !slotptr(set->page, slot)->dead ) {
2856                                 ptr = keyptr(set->page, slot);
2857                                 len = ptr->len;
2858
2859                             if( slotptr(set->page, slot)->type == Duplicate )
2860                                         len -= BtId;
2861
2862                                 fwrite (ptr->key, len, 1, stdout);
2863                                 fputc ('\n', stdout);
2864                                 cnt++;
2865                            }
2866
2867                         bt_unlockpage (BtLockRead, set->latch);
2868                         bt_unpinlatch (set->latch);
2869                         bt_unpinpool (set->pool);
2870                 } while( page_no = next );
2871
2872                 fprintf(stderr, " Total keys read %d\n", cnt);
2873                 break;
2874
2875         case 'r':
2876                 fprintf(stderr, "started reverse scan\n");
2877                 if( slot = bt_lastkey (bt) )
2878                    while( slot = bt_prevkey (bt, slot) ) {
2879                         if( slotptr(bt->cursor, slot)->dead )
2880                           continue;
2881
2882                         ptr = keyptr(bt->cursor, slot);
2883                         len = ptr->len;
2884
2885                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2886                                 len -= BtId;
2887
2888                         fwrite (ptr->key, len, 1, stdout);
2889                         fputc ('\n', stdout);
2890                         cnt++;
2891                   }
2892
2893                 fprintf(stderr, " Total keys read %d\n", cnt);
2894                 break;
2895
2896         case 'c':
2897 #ifdef unix
2898                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2899 #endif
2900                 fprintf(stderr, "started counting\n");
2901                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2902                 page_no = LEAF_page;
2903
2904                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2905                 uid off = page_no << bt->mgr->page_bits;
2906 #ifdef unix
2907                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2908 #else
2909                 DWORD amt[1];
2910
2911                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2912
2913                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2914                         return bt->err = BTERR_map;
2915
2916                   if( *amt <  bt->mgr->page_size )
2917                         return bt->err = BTERR_map;
2918 #endif
2919                         if( !bt->frame->free && !bt->frame->lvl )
2920                                 cnt += bt->frame->act;
2921                         if( page_no > LEAF_page )
2922                                 next = page_no + 1;
2923                         page_no = next;
2924                 }
2925                 
2926                 cnt--;  // remove stopper key
2927                 fprintf(stderr, " Total keys read %d\n", cnt);
2928                 break;
2929         }
2930
2931         bt_close (bt);
2932 #ifdef unix
2933         return NULL;
2934 #else
2935         return 0;
2936 #endif
2937 }
2938
2939 typedef struct timeval timer;
2940
2941 int main (int argc, char **argv)
2942 {
2943 int idx, cnt, len, slot, err;
2944 int segsize, bits = 16;
2945 double start, stop;
2946 #ifdef unix
2947 pthread_t *threads;
2948 #else
2949 HANDLE *threads;
2950 #endif
2951 ThreadArg *args;
2952 uint poolsize = 0;
2953 float elapsed;
2954 int num = 0;
2955 char key[1];
2956 BtMgr *mgr;
2957 BtKey ptr;
2958 BtDb *bt;
2959
2960         if( argc < 3 ) {
2961                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2962                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2963                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2964                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2965                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2966                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2967                 exit(0);
2968         }
2969
2970         start = getCpuTime(0);
2971
2972         if( argc > 3 )
2973                 bits = atoi(argv[3]);
2974
2975         if( argc > 4 )
2976                 poolsize = atoi(argv[4]);
2977
2978         if( !poolsize )
2979                 fprintf (stderr, "Warning: no mapped_pool\n");
2980
2981         if( poolsize > 65535 )
2982                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2983
2984         if( argc > 5 )
2985                 segsize = atoi(argv[5]);
2986         else
2987                 segsize = 4;    // 16 pages per mmap segment
2988
2989         if( argc > 6 )
2990                 num = atoi(argv[6]);
2991
2992         cnt = argc - 7;
2993 #ifdef unix
2994         threads = malloc (cnt * sizeof(pthread_t));
2995 #else
2996         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2997 #endif
2998         args = malloc (cnt * sizeof(ThreadArg));
2999
3000         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3001
3002         if( !mgr ) {
3003                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3004                 exit (1);
3005         }
3006
3007         //      fire off threads
3008
3009         for( idx = 0; idx < cnt; idx++ ) {
3010                 args[idx].infile = argv[idx + 7];
3011                 args[idx].type = argv[2];
3012                 args[idx].mgr = mgr;
3013                 args[idx].num = num;
3014                 args[idx].idx = idx;
3015 #ifdef unix
3016                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3017                         fprintf(stderr, "Error creating thread %d\n", err);
3018 #else
3019                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3020 #endif
3021         }
3022
3023         //      wait for termination
3024
3025 #ifdef unix
3026         for( idx = 0; idx < cnt; idx++ )
3027                 pthread_join (threads[idx], NULL);
3028 #else
3029         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3030
3031         for( idx = 0; idx < cnt; idx++ )
3032                 CloseHandle(threads[idx]);
3033
3034 #endif
3035         elapsed = getCpuTime(0) - start;
3036         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3037         elapsed = getCpuTime(1);
3038         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3039         elapsed = getCpuTime(2);
3040         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3041
3042         bt_mgrclose (mgr);
3043 }
3044
3045 #endif  //STANDALONE