]> pd.if.org Git - btree/blob - threadskv4b.c
Introduce threadskv7 that includes atomic insert of a set of keys with values
[btree] / threadskv4b.c
1 // btree version threadskv4b sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      and duplicate key management
6
7 // 08 SEP 2014
8
9 // author: karl malbrain, malbrain@cal.berkeley.edu
10
11 /*
12 This work, including the source code, documentation
13 and related data, is placed into the public domain.
14
15 The orginal author is Karl Malbrain.
16
17 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
18 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
19 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
20 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
21 RESULTING FROM THE USE, MODIFICATION, OR
22 REDISTRIBUTION OF THIS SOFTWARE.
23 */
24
25 // Please see the project home page for documentation
26 // code.google.com/p/high-concurrency-btree
27
28 #define _FILE_OFFSET_BITS 64
29 #define _LARGEFILE64_SOURCE
30
31 #ifdef linux
32 #define _GNU_SOURCE
33 #endif
34
35 #ifdef unix
36 #include <unistd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <fcntl.h>
40 #include <sys/time.h>
41 #include <sys/mman.h>
42 #include <errno.h>
43 #include <pthread.h>
44 #else
45 #define WIN32_LEAN_AND_MEAN
46 #include <windows.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <time.h>
50 #include <fcntl.h>
51 #include <process.h>
52 #include <intrin.h>
53 #endif
54
55 #include <memory.h>
56 #include <string.h>
57 #include <stddef.h>
58
59 typedef unsigned long long      uid;
60
61 #ifndef unix
62 typedef unsigned long long      off64_t;
63 typedef unsigned short          ushort;
64 typedef unsigned int            uint;
65 #endif
66
67 #define BT_latchtable   128                                     // number of latch manager slots
68
69 #define BT_ro 0x6f72    // ro
70 #define BT_rw 0x7772    // rw
71
72 #define BT_maxbits              24                                      // maximum page size in bits
73 #define BT_minbits              9                                       // minimum page size in bits
74 #define BT_minpage              (1 << BT_minbits)       // minimum page size
75 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
76
77 /*
78 There are five lock types for each node in three independent sets: 
79 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
80 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
81 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
82 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
83 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
84 */
85
86 typedef enum{
87         BtLockAccess,
88         BtLockDelete,
89         BtLockRead,
90         BtLockWrite,
91         BtLockParent
92 } BtLock;
93
94 //      definition for phase-fair reader/writer lock implementation
95
96 typedef struct {
97         ushort rin[1];
98         ushort rout[1];
99         ushort ticket[1];
100         ushort serving[1];
101 } RWLock;
102
103 #define PHID 0x1
104 #define PRES 0x2
105 #define MASK 0x3
106 #define RINC 0x4
107
108 //      definition for spin latch implementation
109
110 // exclusive is set for write access
111 // share is count of read accessors
112 // grant write lock when share == 0
113
114 volatile typedef struct {
115         ushort exclusive:1;
116         ushort pending:1;
117         ushort share:14;
118 } BtSpinLatch;
119
120 #define XCL 1
121 #define PEND 2
122 #define BOTH 3
123 #define SHARE 4
124
125 //  hash table entries
126
127 typedef struct {
128         BtSpinLatch latch[1];
129         volatile ushort slot;           // Latch table entry at head of chain
130 } BtHashEntry;
131
132 //      latch manager table structure
133
134 typedef struct {
135         RWLock readwr[1];               // read/write page lock
136         RWLock access[1];               // Access Intent/Page delete
137         RWLock parent[1];               // Posting of fence key in parent
138         BtSpinLatch busy[1];            // slot is being moved between chains
139         volatile ushort next;           // next entry in hash table chain
140         volatile ushort prev;           // prev entry in hash table chain
141         volatile ushort pin;            // number of outstanding locks
142         volatile ushort hash;           // hash slot entry is under
143         volatile uid page_no;           // latch set page number
144 } BtLatchSet;
145
146 //      Define the length of the page and key pointers
147
148 #define BtId 6
149
150 //      Page key slot definition.
151
152 //      Keys are marked dead, but remain on the page until
153 //      it cleanup is called. The fence key (highest key) for
154 //      a leaf page is always present, even after cleanup.
155
156 //      Slot types
157
158 //      In addition to the Unique keys that occupy slots
159 //      there are Librarian and Duplicate key
160 //      slots occupying the key slot array.
161
162 //      The Librarian slots are dead keys that
163 //      serve as filler, available to add new Unique
164 //      or Dup slots that are inserted into the B-tree.
165
166 //      The Duplicate slots have had their key bytes extended
167 //      by 6 bytes to contain a binary duplicate key uniqueifier.
168
169 typedef enum {
170         Unique,
171         Librarian,
172         Duplicate
173 } BtSlotType;
174
175 typedef struct {
176         uint off:BT_maxbits;    // page offset for key start
177         uint type:3;                    // type of slot
178         uint dead:1;                    // set for deleted slot
179 } BtSlot;
180
181 //      The key structure occupies space at the upper end of
182 //      each page.  It's a length byte followed by the key
183 //      bytes.
184
185 typedef struct {
186         unsigned char len;
187         unsigned char key[1];
188 } *BtKey;
189
190 //      the value structure also occupies space at the upper
191 //      end of the page. Each key is immediately followed by a value.
192
193 typedef struct {
194         unsigned char len;
195         unsigned char value[1];
196 } *BtVal;
197
198 //      The first part of an index page.
199 //      It is immediately followed
200 //      by the BtSlot array of keys.
201
202 //      note that this structure size
203 //      must be a multiple of 8 bytes
204 //      in order to place dups correctly.
205
206 typedef struct BtPage_ {
207         uint cnt;                                       // count of keys in page
208         uint act;                                       // count of active keys
209         uint min;                                       // next key offset
210         uint garbage;                           // page garbage in bytes
211         unsigned char bits:7;           // page size in bits
212         unsigned char free:1;           // page is on free chain
213         unsigned char lvl:7;            // level of page
214         unsigned char kill:1;           // page is being deleted
215         unsigned char right[BtId];      // page number to right
216 } *BtPage;
217
218 //      The memory mapping pool table buffer manager entry
219
220 typedef struct {
221         uid  basepage;                          // mapped base page number
222         char *map;                                      // mapped memory pointer
223         ushort slot;                            // slot index in this array
224         ushort pin;                                     // mapped page pin counter
225         void *hashprev;                         // previous pool entry for the same hash idx
226         void *hashnext;                         // next pool entry for the same hash idx
227 #ifndef unix
228         HANDLE hmap;                            // Windows memory mapping handle
229 #endif
230 } BtPool;
231
232 #define CLOCK_bit 0x8000                // bit in pool->pin
233
234 //  The loadpage interface object
235
236 typedef struct {
237         uid page_no;            // current page number
238         BtPage page;            // current page pointer
239         BtPool *pool;           // current page pool
240         BtLatchSet *latch;      // current page latch set
241 } BtPageSet;
242
243 //      structure for latch manager on ALLOC_page
244
245 typedef struct {
246         struct BtPage_ alloc[1];        // next page_no in right ptr
247         unsigned long long dups[1];     // global duplicate key uniqueifier
248         unsigned char chain[BtId];      // head of free page_nos chain
249         BtSpinLatch lock[1];            // allocation area lite latch
250         ushort latchdeployed;           // highest number of latch entries deployed
251         ushort nlatchpage;                      // number of latch pages at BT_latch
252         ushort latchtotal;                      // number of page latch entries
253         ushort latchhash;                       // number of latch hash table slots
254         ushort latchvictim;                     // next latch entry to examine
255         BtHashEntry table[0];           // the hash table
256 } BtLatchMgr;
257
258 //      The object structure for Btree access
259
260 typedef struct {
261         uint page_size;                         // page size    
262         uint page_bits;                         // page size in bits    
263         uint seg_bits;                          // seg size in pages in bits
264         uint mode;                                      // read-write mode
265 #ifdef unix
266         int idx;
267 #else
268         HANDLE idx;
269 #endif
270         ushort poolcnt;                         // highest page pool node in use
271         ushort poolmax;                         // highest page pool node allocated
272         ushort poolmask;                        // total number of pages in mmap segment - 1
273         ushort hashsize;                        // size of Hash Table for pool entries
274         volatile uint evicted;          // last evicted hash table slot
275         ushort *hash;                           // pool index for hash entries
276         BtSpinLatch *latch;                     // latches for hash table slots
277         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
278         BtLatchSet *latchsets;          // mapped latch set from latch pages
279         BtPool *pool;                           // memory pool page segments
280 #ifndef unix
281         HANDLE halloc;                          // allocation and latch table handle
282 #endif
283 } BtMgr;
284
285 typedef struct {
286         BtMgr *mgr;                             // buffer manager for thread
287         BtPage cursor;                  // cached frame for start/next (never mapped)
288         BtPage frame;                   // spare frame for the page split (never mapped)
289         uid cursor_page;                // current cursor page number   
290         unsigned char *mem;             // frame, cursor, page memory buffer
291         unsigned char key[256]; // last found complete key
292         int found;                              // last delete or insert was found
293         int err;                                // last error
294 } BtDb;
295
296 typedef enum {
297         BTERR_ok = 0,
298         BTERR_struct,
299         BTERR_ovflw,
300         BTERR_lock,
301         BTERR_map,
302         BTERR_wrt,
303         BTERR_hash
304 } BTERR;
305
306 // B-Tree functions
307 extern void bt_close (BtDb *bt);
308 extern BtDb *bt_open (BtMgr *mgr);
309 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
310 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
311 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
312 extern BtKey bt_foundkey (BtDb *bt);
313 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
314 extern uint bt_nextkey   (BtDb *bt, uint slot);
315
316 //      manager functions
317 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
318 void bt_mgrclose (BtMgr *mgr);
319
320 //  Helper functions to return slot values
321 //      from the cursor page.
322
323 extern BtKey bt_key (BtDb *bt, uint slot);
324 extern BtVal bt_val (BtDb *bt, uint slot);
325
326 //  BTree page number constants
327 #define ALLOC_page              0       // allocation & latch manager hash table
328 #define ROOT_page               1       // root of the btree
329 #define LEAF_page               2       // first page of leaves
330 #define LATCH_page              3       // pages for latch manager
331
332 //      Number of levels to create in a new BTree
333
334 #define MIN_lvl                 2
335
336 //  The page is allocated from low and hi ends.
337 //  The key slots are allocated from the bottom,
338 //      while the text and value of the key
339 //  are allocated from the top.  When the two
340 //  areas meet, the page is split into two.
341
342 //  A key consists of a length byte, two bytes of
343 //  index number (0 - 65534), and up to 253 bytes
344 //  of key value.
345
346 //  Associated with each key is a value byte string
347 //      containing any value desired.
348
349 //  The b-tree root is always located at page 1.
350 //      The first leaf page of level zero is always
351 //      located on page 2.
352
353 //      The b-tree pages are linked with next
354 //      pointers to facilitate enumerators,
355 //      and provide for concurrency.
356
357 //      When to root page fills, it is split in two and
358 //      the tree height is raised by a new root at page
359 //      one with two keys.
360
361 //      Deleted keys are marked with a dead bit until
362 //      page cleanup. The fence key for a leaf node is
363 //      always present
364
365 //  Groups of pages called segments from the btree are optionally
366 //  cached with a memory mapped pool. A hash table is used to keep
367 //  track of the cached segments.  This behaviour is controlled
368 //  by the cache block size parameter to bt_open.
369
370 //  To achieve maximum concurrency one page is locked at a time
371 //  as the tree is traversed to find leaf key in question. The right
372 //  page numbers are used in cases where the page is being split,
373 //      or consolidated.
374
375 //  Page 0 is dedicated to lock for new page extensions,
376 //      and chains empty pages together for reuse. It also
377 //      contains the latch manager hash table.
378
379 //      The ParentModification lock on a node is obtained to serialize posting
380 //      or changing the fence key for a node.
381
382 //      Empty pages are chained together through the ALLOC page and reused.
383
384 //      Access macros to address slot and key values from the page
385 //      Page slots use 1 based indexing.
386
387 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
388 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
389 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
390
391 void bt_putid(unsigned char *dest, uid id)
392 {
393 int i = BtId;
394
395         while( i-- )
396                 dest[i] = (unsigned char)id, id >>= 8;
397 }
398
399 uid bt_getid(unsigned char *src)
400 {
401 uid id = 0;
402 int i;
403
404         for( i = 0; i < BtId; i++ )
405                 id <<= 8, id |= *src++; 
406
407         return id;
408 }
409
410 uid bt_newdup (BtDb *bt)
411 {
412 #ifdef unix
413         return __sync_fetch_and_add (bt->mgr->latchmgr->dups, 1) + 1;
414 #else
415         return _InterlockedIncrement64(bt->mgr->latchmgr->dups, 1);
416 #endif
417 }
418
419 //      Phase-Fair reader/writer lock implementation
420
421 void WriteLock (RWLock *lock)
422 {
423 ushort w, r, tix;
424
425 #ifdef unix
426         tix = __sync_fetch_and_add (lock->ticket, 1);
427 #else
428         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
429 #endif
430         // wait for our ticket to come up
431
432         while( tix != lock->serving[0] )
433 #ifdef unix
434                 sched_yield();
435 #else
436                 SwitchToThread ();
437 #endif
438
439         w = PRES | (tix & PHID);
440 #ifdef  unix
441         r = __sync_fetch_and_add (lock->rin, w);
442 #else
443         r = _InterlockedExchangeAdd16 (lock->rin, w);
444 #endif
445         while( r != *lock->rout )
446 #ifdef unix
447                 sched_yield();
448 #else
449                 SwitchToThread();
450 #endif
451 }
452
453 void WriteRelease (RWLock *lock)
454 {
455 #ifdef unix
456         __sync_fetch_and_and (lock->rin, ~MASK);
457 #else
458         _InterlockedAnd16 (lock->rin, ~MASK);
459 #endif
460         lock->serving[0]++;
461 }
462
463 void ReadLock (RWLock *lock)
464 {
465 ushort w;
466 #ifdef unix
467         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
468 #else
469         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
470 #endif
471         if( w )
472           while( w == (*lock->rin & MASK) )
473 #ifdef unix
474                 sched_yield ();
475 #else
476                 SwitchToThread ();
477 #endif
478 }
479
480 void ReadRelease (RWLock *lock)
481 {
482 #ifdef unix
483         __sync_fetch_and_add (lock->rout, RINC);
484 #else
485         _InterlockedExchangeAdd16 (lock->rout, RINC);
486 #endif
487 }
488
489 //      Spin Latch Manager
490
491 //      wait until write lock mode is clear
492 //      and add 1 to the share count
493
494 void bt_spinreadlock(BtSpinLatch *latch)
495 {
496 ushort prev;
497
498   do {
499 #ifdef unix
500         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
501 #else
502         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
503 #endif
504         //  see if exclusive request is granted or pending
505
506         if( !(prev & BOTH) )
507                 return;
508 #ifdef unix
509         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
510 #else
511         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
512 #endif
513 #ifdef  unix
514   } while( sched_yield(), 1 );
515 #else
516   } while( SwitchToThread(), 1 );
517 #endif
518 }
519
520 //      wait for other read and write latches to relinquish
521
522 void bt_spinwritelock(BtSpinLatch *latch)
523 {
524 ushort prev;
525
526   do {
527 #ifdef  unix
528         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
529 #else
530         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
531 #endif
532         if( !(prev & XCL) )
533           if( !(prev & ~BOTH) )
534                 return;
535           else
536 #ifdef unix
537                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
538 #else
539                 _InterlockedAnd16((ushort *)latch, ~XCL);
540 #endif
541 #ifdef  unix
542   } while( sched_yield(), 1 );
543 #else
544   } while( SwitchToThread(), 1 );
545 #endif
546 }
547
548 //      try to obtain write lock
549
550 //      return 1 if obtained,
551 //              0 otherwise
552
553 int bt_spinwritetry(BtSpinLatch *latch)
554 {
555 ushort prev;
556
557 #ifdef  unix
558         prev = __sync_fetch_and_or((ushort *)latch, XCL);
559 #else
560         prev = _InterlockedOr16((ushort *)latch, XCL);
561 #endif
562         //      take write access if all bits are clear
563
564         if( !(prev & XCL) )
565           if( !(prev & ~BOTH) )
566                 return 1;
567           else
568 #ifdef unix
569                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
570 #else
571                 _InterlockedAnd16((ushort *)latch, ~XCL);
572 #endif
573         return 0;
574 }
575
576 //      clear write mode
577
578 void bt_spinreleasewrite(BtSpinLatch *latch)
579 {
580 #ifdef unix
581         __sync_fetch_and_and((ushort *)latch, ~BOTH);
582 #else
583         _InterlockedAnd16((ushort *)latch, ~BOTH);
584 #endif
585 }
586
587 //      decrement reader count
588
589 void bt_spinreleaseread(BtSpinLatch *latch)
590 {
591 #ifdef unix
592         __sync_fetch_and_add((ushort *)latch, -SHARE);
593 #else
594         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
595 #endif
596 }
597
598 //      link latch table entry into latch hash table
599
600 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
601 {
602 BtLatchSet *set = bt->mgr->latchsets + victim;
603
604         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
605                 bt->mgr->latchsets[set->next].prev = victim;
606
607         bt->mgr->latchmgr->table[hashidx].slot = victim;
608         set->page_no = page_no;
609         set->hash = hashidx;
610         set->prev = 0;
611 }
612
613 //      release latch pin
614
615 void bt_unpinlatch (BtLatchSet *set)
616 {
617 #ifdef unix
618         __sync_fetch_and_add(&set->pin, -1);
619 #else
620         _InterlockedDecrement16 (&set->pin);
621 #endif
622 }
623
624 //      find existing latchset or inspire new one
625 //      return with latchset pinned
626
627 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
628 {
629 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
630 ushort slot, avail = 0, victim, idx;
631 BtLatchSet *set;
632
633         //  obtain read lock on hash table entry
634
635         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
636
637         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
638         {
639                 set = bt->mgr->latchsets + slot;
640                 if( page_no == set->page_no )
641                         break;
642         } while( slot = set->next );
643
644         if( slot ) {
645 #ifdef unix
646                 __sync_fetch_and_add(&set->pin, 1);
647 #else
648                 _InterlockedIncrement16 (&set->pin);
649 #endif
650         }
651
652     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
653
654         if( slot )
655                 return set;
656
657   //  try again, this time with write lock
658
659   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
660
661   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
662   {
663         set = bt->mgr->latchsets + slot;
664         if( page_no == set->page_no )
665                 break;
666         if( !set->pin && !avail )
667                 avail = slot;
668   } while( slot = set->next );
669
670   //  found our entry, or take over an unpinned one
671
672   if( slot || (slot = avail) ) {
673         set = bt->mgr->latchsets + slot;
674 #ifdef unix
675         __sync_fetch_and_add(&set->pin, 1);
676 #else
677         _InterlockedIncrement16 (&set->pin);
678 #endif
679         set->page_no = page_no;
680         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
681         return set;
682   }
683
684         //  see if there are any unused entries
685 #ifdef unix
686         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
687 #else
688         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
689 #endif
690
691         if( victim < bt->mgr->latchmgr->latchtotal ) {
692                 set = bt->mgr->latchsets + victim;
693 #ifdef unix
694                 __sync_fetch_and_add(&set->pin, 1);
695 #else
696                 _InterlockedIncrement16 (&set->pin);
697 #endif
698                 bt_latchlink (bt, hashidx, victim, page_no);
699                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
700                 return set;
701         }
702
703 #ifdef unix
704         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
705 #else
706         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
707 #endif
708   //  find and reuse previous lock entry
709
710   while( 1 ) {
711 #ifdef unix
712         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
713 #else
714         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
715 #endif
716         //      we don't use slot zero
717
718         if( victim %= bt->mgr->latchmgr->latchtotal )
719                 set = bt->mgr->latchsets + victim;
720         else
721                 continue;
722
723         //      take control of our slot
724         //      from other threads
725
726         if( set->pin || !bt_spinwritetry (set->busy) )
727                 continue;
728
729         idx = set->hash;
730
731         // try to get write lock on hash chain
732         //      skip entry if not obtained
733         //      or has outstanding locks
734
735         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
736                 bt_spinreleasewrite (set->busy);
737                 continue;
738         }
739
740         if( set->pin ) {
741                 bt_spinreleasewrite (set->busy);
742                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
743                 continue;
744         }
745
746         //  unlink our available victim from its hash chain
747
748         if( set->prev )
749                 bt->mgr->latchsets[set->prev].next = set->next;
750         else
751                 bt->mgr->latchmgr->table[idx].slot = set->next;
752
753         if( set->next )
754                 bt->mgr->latchsets[set->next].prev = set->prev;
755
756         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
757 #ifdef unix
758         __sync_fetch_and_add(&set->pin, 1);
759 #else
760         _InterlockedIncrement16 (&set->pin);
761 #endif
762         bt_latchlink (bt, hashidx, victim, page_no);
763         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
764         bt_spinreleasewrite (set->busy);
765         return set;
766   }
767 }
768
769 void bt_mgrclose (BtMgr *mgr)
770 {
771 BtPool *pool;
772 uint slot;
773
774         // release mapped pages
775         //      note that slot zero is never used
776
777         for( slot = 1; slot < mgr->poolmax; slot++ ) {
778                 pool = mgr->pool + slot;
779                 if( pool->slot )
780 #ifdef unix
781                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
782 #else
783                 {
784                         FlushViewOfFile(pool->map, 0);
785                         UnmapViewOfFile(pool->map);
786                         CloseHandle(pool->hmap);
787                 }
788 #endif
789         }
790
791 #ifdef unix
792         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
793         munmap (mgr->latchmgr, 2 * mgr->page_size);
794 #else
795         FlushViewOfFile(mgr->latchmgr, 0);
796         UnmapViewOfFile(mgr->latchmgr);
797         CloseHandle(mgr->halloc);
798 #endif
799 #ifdef unix
800         close (mgr->idx);
801         free (mgr->pool);
802         free (mgr->hash);
803         free ((void *)mgr->latch);
804         free (mgr);
805 #else
806         FlushFileBuffers(mgr->idx);
807         CloseHandle(mgr->idx);
808         GlobalFree (mgr->pool);
809         GlobalFree (mgr->hash);
810         GlobalFree ((void *)mgr->latch);
811         GlobalFree (mgr);
812 #endif
813 }
814
815 //      close and release memory
816
817 void bt_close (BtDb *bt)
818 {
819 #ifdef unix
820         if( bt->mem )
821                 free (bt->mem);
822 #else
823         if( bt->mem)
824                 VirtualFree (bt->mem, 0, MEM_RELEASE);
825 #endif
826         free (bt);
827 }
828
829 //  open/create new btree buffer manager
830
831 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
832 //              size of mapped page pool (e.g. 8192)
833
834 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
835 {
836 uint lvl, attr, cacheblk, last, slot, idx;
837 uint nlatchpage, latchhash;
838 unsigned char value[BtId];
839 BtLatchMgr *latchmgr;
840 off64_t size;
841 uint amt[1];
842 BtMgr* mgr;
843 BtKey key;
844 BtVal val;
845 int flag;
846
847 #ifndef unix
848 SYSTEM_INFO sysinfo[1];
849 #endif
850
851         // determine sanity of page size and buffer pool
852
853         if( bits > BT_maxbits )
854                 bits = BT_maxbits;
855         else if( bits < BT_minbits )
856                 bits = BT_minbits;
857
858         if( !poolmax )
859                 return NULL;    // must have buffer pool
860
861 #ifdef unix
862         mgr = calloc (1, sizeof(BtMgr));
863
864         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
865
866         if( mgr->idx == -1 )
867                 return free(mgr), NULL;
868         
869         cacheblk = 4096;        // minimum mmap segment size for unix
870
871 #else
872         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
873         attr = FILE_ATTRIBUTE_NORMAL;
874         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
875
876         if( mgr->idx == INVALID_HANDLE_VALUE )
877                 return GlobalFree(mgr), NULL;
878
879         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
880         GetSystemInfo(sysinfo);
881         cacheblk = sysinfo->dwAllocationGranularity;
882 #endif
883
884 #ifdef unix
885         latchmgr = malloc (BT_maxpage);
886         *amt = 0;
887
888         // read minimum page size to get root info
889
890         if( size = lseek (mgr->idx, 0L, 2) ) {
891                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
892                         bits = latchmgr->alloc->bits;
893                 else
894                         return free(mgr), free(latchmgr), NULL;
895         } else if( mode == BT_ro )
896                 return free(latchmgr), bt_mgrclose (mgr), NULL;
897 #else
898         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
899         size = GetFileSize(mgr->idx, amt);
900
901         if( size || *amt ) {
902                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
903                         return bt_mgrclose (mgr), NULL;
904                 bits = latchmgr->alloc->bits;
905         } else if( mode == BT_ro )
906                 return bt_mgrclose (mgr), NULL;
907 #endif
908
909         mgr->page_size = 1 << bits;
910         mgr->page_bits = bits;
911
912         mgr->poolmax = poolmax;
913         mgr->mode = mode;
914
915         if( cacheblk < mgr->page_size )
916                 cacheblk = mgr->page_size;
917
918         //  mask for partial memmaps
919
920         mgr->poolmask = (cacheblk >> bits) - 1;
921
922         //      see if requested size of pages per memmap is greater
923
924         if( (1 << segsize) > mgr->poolmask )
925                 mgr->poolmask = (1 << segsize) - 1;
926
927         mgr->seg_bits = 0;
928
929         while( (1 << mgr->seg_bits) <= mgr->poolmask )
930                 mgr->seg_bits++;
931
932         mgr->hashsize = hashsize;
933
934 #ifdef unix
935         mgr->pool = calloc (poolmax, sizeof(BtPool));
936         mgr->hash = calloc (hashsize, sizeof(ushort));
937         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
938 #else
939         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
940         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
941         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
942 #endif
943
944         if( size || *amt )
945                 goto mgrlatch;
946
947         // initialize an empty b-tree with latch page, root page, page of leaves
948         // and page(s) of latches
949
950         memset (latchmgr, 0, 1 << bits);
951         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
952         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
953         latchmgr->alloc->bits = mgr->page_bits;
954
955         latchmgr->nlatchpage = nlatchpage;
956         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
957
958         //  initialize latch manager
959
960         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
961
962         //      size of hash table = total number of latchsets
963
964         if( latchhash > latchmgr->latchtotal )
965                 latchhash = latchmgr->latchtotal;
966
967         latchmgr->latchhash = latchhash;
968
969 #ifdef unix
970         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
971                 return bt_mgrclose (mgr), NULL;
972 #else
973         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
974                 return bt_mgrclose (mgr), NULL;
975
976         if( *amt < mgr->page_size )
977                 return bt_mgrclose (mgr), NULL;
978 #endif
979
980         memset (latchmgr, 0, 1 << bits);
981         latchmgr->alloc->bits = mgr->page_bits;
982
983         for( lvl=MIN_lvl; lvl--; ) {
984                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
985                 key = keyptr(latchmgr->alloc, 1);
986                 key->len = 2;           // create stopper key
987                 key->key[0] = 0xff;
988                 key->key[1] = 0xff;
989
990                 bt_putid(value, MIN_lvl - lvl + 1);
991                 val = valptr(latchmgr->alloc, 1);
992                 val->len = lvl ? BtId : 0;
993                 memcpy (val->value, value, val->len);
994
995                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
996                 latchmgr->alloc->lvl = lvl;
997                 latchmgr->alloc->cnt = 1;
998                 latchmgr->alloc->act = 1;
999 #ifdef unix
1000                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
1001                         return bt_mgrclose (mgr), NULL;
1002 #else
1003                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1004                         return bt_mgrclose (mgr), NULL;
1005
1006                 if( *amt < mgr->page_size )
1007                         return bt_mgrclose (mgr), NULL;
1008 #endif
1009         }
1010
1011         // clear out latch manager locks
1012         //      and rest of pages to round out segment
1013
1014         memset(latchmgr, 0, mgr->page_size);
1015         last = MIN_lvl + 1;
1016
1017         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1018 #ifdef unix
1019                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1020 #else
1021                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1022                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1023                         return bt_mgrclose (mgr), NULL;
1024                 if( *amt < mgr->page_size )
1025                         return bt_mgrclose (mgr), NULL;
1026 #endif
1027                 last++;
1028         }
1029
1030 mgrlatch:
1031 #ifdef unix
1032         // mlock the root page and the latchmgr page
1033
1034         flag = PROT_READ | PROT_WRITE;
1035         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1036         if( mgr->latchmgr == MAP_FAILED )
1037                 return bt_mgrclose (mgr), NULL;
1038         mlock (mgr->latchmgr, 2 * mgr->page_size);
1039
1040         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1041         if( mgr->latchsets == MAP_FAILED )
1042                 return bt_mgrclose (mgr), NULL;
1043         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1044 #else
1045         flag = PAGE_READWRITE;
1046         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1047         if( !mgr->halloc )
1048                 return bt_mgrclose (mgr), NULL;
1049
1050         flag = FILE_MAP_WRITE;
1051         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1052         if( !mgr->latchmgr )
1053                 return GetLastError(), bt_mgrclose (mgr), NULL;
1054
1055         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1056 #endif
1057
1058 #ifdef unix
1059         free (latchmgr);
1060 #else
1061         VirtualFree (latchmgr, 0, MEM_RELEASE);
1062 #endif
1063         return mgr;
1064 }
1065
1066 //      open BTree access method
1067 //      based on buffer manager
1068
1069 BtDb *bt_open (BtMgr *mgr)
1070 {
1071 BtDb *bt = malloc (sizeof(*bt));
1072
1073         memset (bt, 0, sizeof(*bt));
1074         bt->mgr = mgr;
1075 #ifdef unix
1076         bt->mem = malloc (2 *mgr->page_size);
1077 #else
1078         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1079 #endif
1080         bt->frame = (BtPage)bt->mem;
1081         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1082         return bt;
1083 }
1084
1085 //  compare two keys, returning > 0, = 0, or < 0
1086 //  as the comparison value
1087
1088 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1089 {
1090 uint len1 = key1->len;
1091 int ans;
1092
1093         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1094                 return ans;
1095
1096         if( len1 > len2 )
1097                 return 1;
1098         if( len1 < len2 )
1099                 return -1;
1100
1101         return 0;
1102 }
1103
1104 //      Buffer Pool mgr
1105
1106 // find segment in pool
1107 // must be called with hashslot idx locked
1108 //      return NULL if not there
1109 //      otherwise return node
1110
1111 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1112 {
1113 BtPool *pool;
1114 uint slot;
1115
1116         // compute start of hash chain in pool
1117
1118         if( slot = bt->mgr->hash[idx] ) 
1119                 pool = bt->mgr->pool + slot;
1120         else
1121                 return NULL;
1122
1123         page_no &= ~bt->mgr->poolmask;
1124
1125         while( pool->basepage != page_no )
1126           if( pool = pool->hashnext )
1127                 continue;
1128           else
1129                 return NULL;
1130
1131         return pool;
1132 }
1133
1134 // add segment to hash table
1135
1136 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1137 {
1138 BtPool *node;
1139 uint slot;
1140
1141         pool->hashprev = pool->hashnext = NULL;
1142         pool->basepage = page_no & ~bt->mgr->poolmask;
1143         pool->pin = CLOCK_bit + 1;
1144
1145         if( slot = bt->mgr->hash[idx] ) {
1146                 node = bt->mgr->pool + slot;
1147                 pool->hashnext = node;
1148                 node->hashprev = pool;
1149         }
1150
1151         bt->mgr->hash[idx] = pool->slot;
1152 }
1153
1154 //  map new buffer pool segment to virtual memory
1155
1156 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1157 {
1158 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1159 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1160 int flag;
1161
1162 #ifdef unix
1163         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1164         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1165         if( pool->map == MAP_FAILED )
1166                 return bt->err = BTERR_map;
1167
1168 #else
1169         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1170         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1171         if( !pool->hmap )
1172                 return bt->err = BTERR_map;
1173
1174         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1175         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1176         if( !pool->map )
1177                 return bt->err = BTERR_map;
1178 #endif
1179         return bt->err = 0;
1180 }
1181
1182 //      calculate page within pool
1183
1184 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1185 {
1186 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1187 BtPage page;
1188
1189         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1190 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1191         return page;
1192 }
1193
1194 //  release pool pin
1195
1196 void bt_unpinpool (BtPool *pool)
1197 {
1198 #ifdef unix
1199         __sync_fetch_and_add(&pool->pin, -1);
1200 #else
1201         _InterlockedDecrement16 (&pool->pin);
1202 #endif
1203 }
1204
1205 //      find or place requested page in segment-pool
1206 //      return pool table entry, incrementing pin
1207
1208 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1209 {
1210 uint slot, hashidx, idx, victim;
1211 BtPool *pool, *node, *next;
1212
1213         //      lock hash table chain
1214
1215         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1216         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1217
1218         //      look up in hash table
1219
1220         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1221 #ifdef unix
1222                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1223                 __sync_fetch_and_add(&pool->pin, 1);
1224 #else
1225                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1226                 _InterlockedIncrement16 (&pool->pin);
1227 #endif
1228                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1229                 return pool;
1230         }
1231
1232         // allocate a new pool node
1233         // and add to hash table
1234
1235 #ifdef unix
1236         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1237 #else
1238         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1239 #endif
1240
1241         if( ++slot < bt->mgr->poolmax ) {
1242                 pool = bt->mgr->pool + slot;
1243                 pool->slot = slot;
1244
1245                 if( bt_mapsegment(bt, pool, page_no) )
1246                         return NULL;
1247
1248                 bt_linkhash(bt, pool, page_no, hashidx);
1249                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1250                 return pool;
1251         }
1252
1253         // pool table is full
1254         //      find best pool entry to evict
1255
1256 #ifdef unix
1257         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1258 #else
1259         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1260 #endif
1261
1262         while( 1 ) {
1263 #ifdef unix
1264                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1265 #else
1266                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1267 #endif
1268                 victim %= bt->mgr->poolmax;
1269                 pool = bt->mgr->pool + victim;
1270                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1271
1272                 if( !victim )
1273                         continue;
1274
1275                 // try to get write lock
1276                 //      skip entry if not obtained
1277
1278                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1279                         continue;
1280
1281                 //      skip this entry if
1282                 //      page is pinned
1283                 //  or clock bit is set
1284
1285                 if( pool->pin ) {
1286 #ifdef unix
1287                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1288 #else
1289                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1290 #endif
1291                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1292                         continue;
1293                 }
1294
1295                 // unlink victim pool node from hash table
1296
1297                 if( node = pool->hashprev )
1298                         node->hashnext = pool->hashnext;
1299                 else if( node = pool->hashnext )
1300                         bt->mgr->hash[idx] = node->slot;
1301                 else
1302                         bt->mgr->hash[idx] = 0;
1303
1304                 if( node = pool->hashnext )
1305                         node->hashprev = pool->hashprev;
1306
1307                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1308
1309                 //      remove old file mapping
1310 #ifdef unix
1311                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1312 #else
1313 //              FlushViewOfFile(pool->map, 0);
1314                 UnmapViewOfFile(pool->map);
1315                 CloseHandle(pool->hmap);
1316 #endif
1317                 pool->map = NULL;
1318
1319                 //  create new pool mapping
1320                 //  and link into hash table
1321
1322                 if( bt_mapsegment(bt, pool, page_no) )
1323                         return NULL;
1324
1325                 bt_linkhash(bt, pool, page_no, hashidx);
1326                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1327                 return pool;
1328         }
1329 }
1330
1331 // place write, read, or parent lock on requested page_no.
1332
1333 void bt_lockpage(BtLock mode, BtLatchSet *set)
1334 {
1335         switch( mode ) {
1336         case BtLockRead:
1337                 ReadLock (set->readwr);
1338                 break;
1339         case BtLockWrite:
1340                 WriteLock (set->readwr);
1341                 break;
1342         case BtLockAccess:
1343                 ReadLock (set->access);
1344                 break;
1345         case BtLockDelete:
1346                 WriteLock (set->access);
1347                 break;
1348         case BtLockParent:
1349                 WriteLock (set->parent);
1350                 break;
1351         }
1352 }
1353
1354 // remove write, read, or parent lock on requested page
1355
1356 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1357 {
1358         switch( mode ) {
1359         case BtLockRead:
1360                 ReadRelease (set->readwr);
1361                 break;
1362         case BtLockWrite:
1363                 WriteRelease (set->readwr);
1364                 break;
1365         case BtLockAccess:
1366                 ReadRelease (set->access);
1367                 break;
1368         case BtLockDelete:
1369                 WriteRelease (set->access);
1370                 break;
1371         case BtLockParent:
1372                 WriteRelease (set->parent);
1373                 break;
1374         }
1375 }
1376
1377 //      allocate a new page and write page into it
1378
1379 uid bt_newpage(BtDb *bt, BtPage page)
1380 {
1381 BtPageSet set[1];
1382 uid new_page;
1383 int blk;
1384
1385         //      lock allocation page
1386
1387         bt_spinwritelock(bt->mgr->latchmgr->lock);
1388
1389         // use empty chain first
1390         // else allocate empty page
1391
1392         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1393                 if( set->pool = bt_pinpool (bt, new_page) )
1394                         set->page = bt_page (bt, set->pool, new_page);
1395                 else
1396                         return 0;
1397
1398                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1399                 bt_unpinpool (set->pool);
1400         } else {
1401                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1402                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1403 #ifdef unix
1404                 // if writing first page of pool block, set file length thru last page
1405
1406                 if( (new_page & bt->mgr->poolmask) == 0 )
1407                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1408 #endif
1409         }
1410 #ifdef unix
1411         // unlock allocation latch
1412
1413         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1414 #endif
1415
1416         //      bring new page into pool and copy page.
1417         //      this will extend the file into the new pages on WIN32.
1418
1419         if( set->pool = bt_pinpool (bt, new_page) )
1420                 set->page = bt_page (bt, set->pool, new_page);
1421         else
1422                 return 0;
1423
1424         memcpy(set->page, page, bt->mgr->page_size);
1425         bt_unpinpool (set->pool);
1426
1427 #ifndef unix
1428         // unlock allocation latch
1429
1430         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1431 #endif
1432         return new_page;
1433 }
1434
1435 //  find slot in page for given key at a given level
1436
1437 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1438 {
1439 uint diff, higher = set->page->cnt, low = 1, slot;
1440 uint good = 0;
1441
1442         //        make stopper key an infinite fence value
1443
1444         if( bt_getid (set->page->right) )
1445                 higher++;
1446         else
1447                 good++;
1448
1449         //      low is the lowest candidate.
1450         //  loop ends when they meet
1451
1452         //  higher is already
1453         //      tested as .ge. the passed key.
1454
1455         while( diff = higher - low ) {
1456                 slot = low + ( diff >> 1 );
1457                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1458                         low = slot + 1;
1459                 else
1460                         higher = slot, good++;
1461         }
1462
1463         //      return zero if key is on right link page
1464
1465         return good ? higher : 0;
1466 }
1467
1468 //  find and load page at given level for given key
1469 //      leave page rd or wr locked as requested
1470
1471 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1472 {
1473 uid page_no = ROOT_page, prevpage = 0;
1474 uint drill = 0xff, slot;
1475 BtLatchSet *prevlatch;
1476 uint mode, prevmode;
1477 BtPool *prevpool;
1478
1479   //  start at root of btree and drill down
1480
1481   do {
1482         // determine lock mode of drill level
1483         mode = (drill == lvl) ? lock : BtLockRead; 
1484
1485         set->latch = bt_pinlatch (bt, page_no);
1486         set->page_no = page_no;
1487
1488         // pin page contents
1489
1490         if( set->pool = bt_pinpool (bt, page_no) )
1491                 set->page = bt_page (bt, set->pool, page_no);
1492         else
1493                 return 0;
1494
1495         // obtain access lock using lock chaining with Access mode
1496
1497         if( page_no > ROOT_page )
1498           bt_lockpage(BtLockAccess, set->latch);
1499
1500         //      release & unpin parent page
1501
1502         if( prevpage ) {
1503           bt_unlockpage(prevmode, prevlatch);
1504           bt_unpinlatch (prevlatch);
1505           bt_unpinpool (prevpool);
1506           prevpage = 0;
1507         }
1508
1509         // obtain read lock using lock chaining
1510
1511         bt_lockpage(mode, set->latch);
1512
1513         if( set->page->free )
1514                 return bt->err = BTERR_struct, 0;
1515
1516         if( page_no > ROOT_page )
1517           bt_unlockpage(BtLockAccess, set->latch);
1518
1519         // re-read and re-lock root after determining actual level of root
1520
1521         if( set->page->lvl != drill) {
1522                 if( set->page_no != ROOT_page )
1523                         return bt->err = BTERR_struct, 0;
1524                         
1525                 drill = set->page->lvl;
1526
1527                 if( lock != BtLockRead && drill == lvl ) {
1528                   bt_unlockpage(mode, set->latch);
1529                   bt_unpinlatch (set->latch);
1530                   bt_unpinpool (set->pool);
1531                   continue;
1532                 }
1533         }
1534
1535         prevpage = set->page_no;
1536         prevlatch = set->latch;
1537         prevpool = set->pool;
1538         prevmode = mode;
1539
1540         //  find key on page at this level
1541         //  and descend to requested level
1542
1543         if( set->page->kill )
1544           goto slideright;
1545
1546         if( slot = bt_findslot (set, key, len) ) {
1547           if( drill == lvl )
1548                 return slot;
1549
1550           while( slotptr(set->page, slot)->dead )
1551                 if( slot++ < set->page->cnt )
1552                         continue;
1553                 else
1554                         goto slideright;
1555
1556           page_no = bt_getid(valptr(set->page, slot)->value);
1557           drill--;
1558           continue;
1559         }
1560
1561         //  or slide right into next page
1562
1563 slideright:
1564         page_no = bt_getid(set->page->right);
1565
1566   } while( page_no );
1567
1568   // return error on end of right chain
1569
1570   bt->err = BTERR_struct;
1571   return 0;     // return error
1572 }
1573
1574 //      return page to free list
1575 //      page must be delete & write locked
1576
1577 void bt_freepage (BtDb *bt, BtPageSet *set)
1578 {
1579         //      lock allocation page
1580
1581         bt_spinwritelock (bt->mgr->latchmgr->lock);
1582
1583         //      store chain
1584         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1585         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1586         set->page->free = 1;
1587
1588         // unlock released page
1589
1590         bt_unlockpage (BtLockDelete, set->latch);
1591         bt_unlockpage (BtLockWrite, set->latch);
1592         bt_unpinlatch (set->latch);
1593         bt_unpinpool (set->pool);
1594
1595         // unlock allocation page
1596
1597         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1598 }
1599
1600 //      a fence key was deleted from a page
1601 //      push new fence value upwards
1602
1603 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1604 {
1605 unsigned char leftkey[256], rightkey[256];
1606 unsigned char value[BtId];
1607 BtKey ptr;
1608
1609         //      remove the old fence value
1610
1611         ptr = keyptr(set->page, set->page->cnt);
1612         memcpy (rightkey, ptr, ptr->len + 1);
1613         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1614
1615         //  cache new fence value
1616
1617         ptr = keyptr(set->page, set->page->cnt);
1618         memcpy (leftkey, ptr, ptr->len + 1);
1619
1620         bt_lockpage (BtLockParent, set->latch);
1621         bt_unlockpage (BtLockWrite, set->latch);
1622
1623         //      insert new (now smaller) fence key
1624
1625         bt_putid (value, set->page_no);
1626
1627         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 1) )
1628           return bt->err;
1629
1630         //      now delete old fence key
1631
1632         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1633                 return bt->err;
1634
1635         bt_unlockpage (BtLockParent, set->latch);
1636         bt_unpinlatch(set->latch);
1637         bt_unpinpool (set->pool);
1638         return 0;
1639 }
1640
1641 //      root has a single child
1642 //      collapse a level from the tree
1643
1644 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1645 {
1646 BtPageSet child[1];
1647 uint idx;
1648
1649   // find the child entry and promote as new root contents
1650
1651   do {
1652         for( idx = 0; idx++ < root->page->cnt; )
1653           if( !slotptr(root->page, idx)->dead )
1654                 break;
1655
1656         child->page_no = bt_getid (valptr(root->page, idx)->value);
1657
1658         child->latch = bt_pinlatch (bt, child->page_no);
1659         bt_lockpage (BtLockDelete, child->latch);
1660         bt_lockpage (BtLockWrite, child->latch);
1661
1662         if( child->pool = bt_pinpool (bt, child->page_no) )
1663                 child->page = bt_page (bt, child->pool, child->page_no);
1664         else
1665                 return bt->err;
1666
1667         memcpy (root->page, child->page, bt->mgr->page_size);
1668         bt_freepage (bt, child);
1669
1670   } while( root->page->lvl > 1 && root->page->act == 1 );
1671
1672   bt_unlockpage (BtLockWrite, root->latch);
1673   bt_unpinlatch (root->latch);
1674   bt_unpinpool (root->pool);
1675   return 0;
1676 }
1677
1678 //  find and delete key on page by marking delete flag bit
1679 //  if page becomes empty, delete it from the btree
1680
1681 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1682 {
1683 unsigned char lowerfence[256], higherfence[256];
1684 uint slot, idx, found, fence;
1685 BtPageSet set[1], right[1];
1686 unsigned char value[BtId];
1687 BtKey ptr, tst;
1688 BtVal val;
1689
1690         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1691                 ptr = keyptr(set->page, slot);
1692         else
1693                 return bt->err;
1694
1695         // if librarian slot, advance to real slot
1696
1697         if( slotptr(set->page, slot)->type == Librarian )
1698                 ptr = keyptr(set->page, ++slot);
1699
1700         fence = slot == set->page->cnt;
1701
1702         // if key is found delete it, otherwise ignore request
1703
1704         if( found = !keycmp (ptr, key, len) )
1705           if( found = slotptr(set->page, slot)->dead == 0 ) {
1706                 val = valptr(set->page,slot);
1707                 slotptr(set->page, slot)->dead = 1;
1708                 set->page->garbage += ptr->len + val->len + 2;
1709                 set->page->act--;
1710
1711                 // collapse empty slots beneath our fence
1712
1713                 while( idx = set->page->cnt - 1 )
1714                   if( slotptr(set->page, idx)->dead ) {
1715                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1716                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1717                   } else
1718                         break;
1719           }
1720
1721         //      did we delete a fence key in an upper level?
1722
1723         if( found && lvl && fence && set->page->act )
1724           if( bt_fixfence (bt, set, lvl) )
1725                 return bt->err;
1726           else
1727                 return bt->found = found, 0;
1728
1729         //      do we need to collapse root?
1730
1731         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1732           if( bt_collapseroot (bt, set) )
1733                 return bt->err;
1734           else
1735                 return bt->found = found, 0;
1736
1737         //      return if page is not empty
1738
1739         if( set->page->act ) {
1740                 bt_unlockpage(BtLockWrite, set->latch);
1741                 bt_unpinlatch (set->latch);
1742                 bt_unpinpool (set->pool);
1743                 return bt->found = found, 0;
1744         }
1745
1746         //      cache copy of fence key
1747         //      to post in parent
1748
1749         ptr = keyptr(set->page, set->page->cnt);
1750         memcpy (lowerfence, ptr, ptr->len + 1);
1751
1752         //      obtain lock on right page
1753
1754         right->page_no = bt_getid(set->page->right);
1755         right->latch = bt_pinlatch (bt, right->page_no);
1756         bt_lockpage (BtLockWrite, right->latch);
1757
1758         // pin page contents
1759
1760         if( right->pool = bt_pinpool (bt, right->page_no) )
1761                 right->page = bt_page (bt, right->pool, right->page_no);
1762         else
1763                 return 0;
1764
1765         if( right->page->kill )
1766                 return bt->err = BTERR_struct;
1767
1768         // pull contents of right peer into our empty page
1769
1770         memcpy (set->page, right->page, bt->mgr->page_size);
1771
1772         // cache copy of key to update
1773
1774         ptr = keyptr(right->page, right->page->cnt);
1775         memcpy (higherfence, ptr, ptr->len + 1);
1776
1777         // mark right page deleted and point it to left page
1778         //      until we can post parent updates
1779
1780         bt_putid (right->page->right, set->page_no);
1781         right->page->kill = 1;
1782
1783         bt_lockpage (BtLockParent, right->latch);
1784         bt_unlockpage (BtLockWrite, right->latch);
1785
1786         bt_lockpage (BtLockParent, set->latch);
1787         bt_unlockpage (BtLockWrite, set->latch);
1788
1789         // redirect higher key directly to our new node contents
1790
1791         bt_putid (value, set->page_no);
1792
1793         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, 1) )
1794           return bt->err;
1795
1796         //      delete old lower key to our node
1797
1798         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1799           return bt->err;
1800
1801         //      obtain delete and write locks to right node
1802
1803         bt_unlockpage (BtLockParent, right->latch);
1804         bt_lockpage (BtLockDelete, right->latch);
1805         bt_lockpage (BtLockWrite, right->latch);
1806         bt_freepage (bt, right);
1807
1808         bt_unlockpage (BtLockParent, set->latch);
1809         bt_unpinlatch (set->latch);
1810         bt_unpinpool (set->pool);
1811         bt->found = found;
1812         return 0;
1813 }
1814
1815 BtKey bt_foundkey (BtDb *bt)
1816 {
1817         return (BtKey)(bt->key);
1818 }
1819
1820 //      advance to next slot
1821
1822 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1823 {
1824 BtLatchSet *prevlatch;
1825 BtPool *prevpool;
1826 uid page_no;
1827
1828         if( slot < set->page->cnt )
1829                 return slot + 1;
1830
1831         prevlatch = set->latch;
1832         prevpool = set->pool;
1833
1834         if( page_no = bt_getid(set->page->right) )
1835                 set->latch = bt_pinlatch (bt, page_no);
1836         else
1837                 return bt->err = BTERR_struct, 0;
1838
1839         // pin page contents
1840
1841         if( set->pool = bt_pinpool (bt, page_no) )
1842                 set->page = bt_page (bt, set->pool, page_no);
1843         else
1844                 return 0;
1845
1846         // obtain access lock using lock chaining with Access mode
1847
1848         bt_lockpage(BtLockAccess, set->latch);
1849
1850         bt_unlockpage(BtLockRead, prevlatch);
1851         bt_unpinlatch (prevlatch);
1852         bt_unpinpool (prevpool);
1853
1854         bt_lockpage(BtLockRead, set->latch);
1855         bt_unlockpage(BtLockAccess, set->latch);
1856
1857         set->page_no = page_no;
1858         return 1;
1859 }
1860
1861 //      find unique key or first duplicate key in
1862 //      leaf level and return number of value bytes
1863 //      or (-1) if not found.  Setup key for bt_foundkey
1864
1865 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1866 {
1867 BtPageSet set[1];
1868 uint len, slot;
1869 int ret = -1;
1870 BtKey ptr;
1871 BtVal val;
1872
1873   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1874    do {
1875         ptr = keyptr(set->page, slot);
1876
1877         //      skip librarian slot place holder
1878
1879         if( slotptr(set->page, slot)->type == Librarian )
1880                 ptr = keyptr(set->page, ++slot);
1881
1882         if( slotptr(set->page, slot)->dead )
1883                 continue;
1884
1885         //      return actual key found
1886
1887         memcpy (bt->key, ptr, ptr->len + 1);
1888         len = ptr->len;
1889
1890         if( slotptr(set->page, slot)->type == Duplicate )
1891                 len -= BtId;
1892
1893         //      not there if we reach the stopper key
1894
1895         if( slot == set->page->cnt )
1896           if( !bt_getid (set->page->right) )
1897                 break;
1898
1899         // if key exists, return >= 0 value bytes copied
1900         //      otherwise return (-1)
1901
1902         if( keylen == len )
1903           if( !memcmp (ptr->key, key, len) ) {
1904                 val = valptr (set->page,slot);
1905                 if( valmax > val->len )
1906                         valmax = val->len;
1907                 memcpy (value, val->value, valmax);
1908                 ret = valmax;
1909           }
1910
1911         break;
1912
1913    } while( slot = bt_findnext (bt, set, slot) );
1914
1915   bt_unlockpage (BtLockRead, set->latch);
1916   bt_unpinlatch (set->latch);
1917   bt_unpinpool (set->pool);
1918   return ret;
1919 }
1920
1921 //      check page for space available,
1922 //      clean if necessary and return
1923 //      0 - page needs splitting
1924 //      >0  new slot value
1925
1926 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1927 {
1928 uint nxt = bt->mgr->page_size;
1929 uint cnt = 0, idx = 0;
1930 uint max = page->cnt;
1931 uint newslot = max;
1932 BtKey key;
1933 BtVal val;
1934
1935         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1936                 return slot;
1937
1938         //      skip cleanup and proceed to split
1939         //      if there's not enough garbage
1940         //      to bother with.
1941
1942         if( page->garbage < nxt / 5 )
1943                 return 0;
1944
1945         memcpy (bt->frame, page, bt->mgr->page_size);
1946
1947         // skip page info and set rest of page to zero
1948
1949         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1950         page->garbage = 0;
1951         page->act = 0;
1952
1953         // clean up page first by
1954         // removing deleted keys
1955
1956         while( cnt++ < max ) {
1957                 if( cnt == slot )
1958                         newslot = idx + 2;
1959                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1960                         continue;
1961
1962                 // copy the value across
1963
1964                 val = valptr(bt->frame, cnt);
1965                 nxt -= val->len + 1;
1966                 ((unsigned char *)page)[nxt] = val->len;
1967                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1968
1969                 // copy the key across
1970
1971                 key = keyptr(bt->frame, cnt);
1972                 nxt -= key->len + 1;
1973                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1974
1975                 // make a librarian slot
1976
1977                 if( idx ) {
1978                         slotptr(page, ++idx)->off = nxt;
1979                         slotptr(page, idx)->type = Librarian;
1980                         slotptr(page, idx)->dead = 1;
1981                 }
1982
1983                 // set up the slot
1984
1985                 slotptr(page, ++idx)->off = nxt;
1986                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1987
1988                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1989                         page->act++;
1990         }
1991
1992         page->min = nxt;
1993         page->cnt = idx;
1994
1995         //      see if page has enough space now, or does it need splitting?
1996
1997         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1998                 return newslot;
1999
2000         return 0;
2001 }
2002
2003 // split the root and raise the height of the btree
2004
2005 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
2006 {
2007 uint nxt = bt->mgr->page_size;
2008 unsigned char value[BtId];
2009 uid left;
2010
2011         //  Obtain an empty page to use, and copy the current
2012         //  root contents into it, e.g. lower keys
2013
2014         if( !(left = bt_newpage(bt, root->page)) )
2015                 return bt->err;
2016
2017         // preserve the page info at the bottom
2018         // of higher keys and set rest to zero
2019
2020         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2021
2022         // insert lower keys page fence key on newroot page as first key
2023
2024         nxt -= BtId + 1;
2025         bt_putid (value, left);
2026         ((unsigned char *)root->page)[nxt] = BtId;
2027         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
2028
2029         nxt -= *leftkey + 1;
2030         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2031         slotptr(root->page, 1)->off = nxt;
2032         
2033         // insert stopper key on newroot page
2034         // and increase the root height
2035
2036         nxt -= 3 + BtId + 1;
2037         ((unsigned char *)root->page)[nxt] = 2;
2038         ((unsigned char *)root->page)[nxt+1] = 0xff;
2039         ((unsigned char *)root->page)[nxt+2] = 0xff;
2040
2041         bt_putid (value, page_no2);
2042         ((unsigned char *)root->page)[nxt+3] = BtId;
2043         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
2044         slotptr(root->page, 2)->off = nxt;
2045
2046         bt_putid(root->page->right, 0);
2047         root->page->min = nxt;          // reset lowest used offset and key count
2048         root->page->cnt = 2;
2049         root->page->act = 2;
2050         root->page->lvl++;
2051
2052         // release and unpin root
2053
2054         bt_unlockpage(BtLockWrite, root->latch);
2055         bt_unpinlatch (root->latch);
2056         bt_unpinpool (root->pool);
2057         return 0;
2058 }
2059
2060 //  split already locked full node
2061 //      return unlocked.
2062
2063 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2064 {
2065 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
2066 unsigned char fencekey[256], rightkey[256];
2067 unsigned char value[BtId];
2068 uint lvl = set->page->lvl;
2069 BtPageSet right[1];
2070 uint prev;
2071 BtKey key;
2072 BtVal val;
2073
2074         //  split higher half of keys to bt->frame
2075
2076         memset (bt->frame, 0, bt->mgr->page_size);
2077         max = set->page->cnt;
2078         cnt = max / 2;
2079         idx = 0;
2080
2081         while( cnt++ < max ) {
2082                 if( slotptr(set->page, cnt)->dead && cnt < max )
2083                         continue;
2084                 val = valptr(set->page, cnt);
2085                 nxt -= val->len + 1;
2086                 ((unsigned char *)bt->frame)[nxt] = val->len;
2087                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
2088
2089                 key = keyptr(set->page, cnt);
2090                 nxt -= key->len + 1;
2091                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2092
2093                 //      add librarian slot
2094
2095                 if( idx ) {
2096                         slotptr(bt->frame, ++idx)->off = nxt;
2097                         slotptr(bt->frame, idx)->type = Librarian;
2098                         slotptr(bt->frame, idx)->dead = 1;
2099                 }
2100
2101                 //  add actual slot
2102
2103                 slotptr(bt->frame, ++idx)->off = nxt;
2104                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
2105
2106                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2107                         bt->frame->act++;
2108         }
2109
2110         // remember existing fence key for new page to the right
2111
2112         memcpy (rightkey, key, key->len + 1);
2113
2114         bt->frame->bits = bt->mgr->page_bits;
2115         bt->frame->min = nxt;
2116         bt->frame->cnt = idx;
2117         bt->frame->lvl = lvl;
2118
2119         // link right node
2120
2121         if( set->page_no > ROOT_page )
2122                 memcpy (bt->frame->right, set->page->right, BtId);
2123
2124         //      get new free page and write higher keys to it.
2125
2126         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2127                 return bt->err;
2128
2129         //      update lower keys to continue in old page
2130
2131         memcpy (bt->frame, set->page, bt->mgr->page_size);
2132         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2133         nxt = bt->mgr->page_size;
2134         set->page->garbage = 0;
2135         set->page->act = 0;
2136         max /= 2;
2137         cnt = 0;
2138         idx = 0;
2139
2140         if( slotptr(bt->frame, max)->type == Librarian )
2141                 max--;
2142
2143         //  assemble page of smaller keys
2144
2145         while( cnt++ < max ) {
2146                 if( slotptr(bt->frame, cnt)->dead )
2147                         continue;
2148                 val = valptr(bt->frame, cnt);
2149                 nxt -= val->len + 1;
2150                 ((unsigned char *)set->page)[nxt] = val->len;
2151                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2152
2153                 key = keyptr(bt->frame, cnt);
2154                 nxt -= key->len + 1;
2155                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2156
2157                 //      add librarian slot
2158
2159                 if( idx ) {
2160                         slotptr(set->page, ++idx)->off = nxt;
2161                         slotptr(set->page, idx)->type = Librarian;
2162                         slotptr(set->page, idx)->dead = 1;
2163                 }
2164
2165                 //      add actual slot
2166
2167                 slotptr(set->page, ++idx)->off = nxt;
2168                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2169                 set->page->act++;
2170         }
2171
2172         // remember fence key for smaller page
2173
2174         memcpy(fencekey, key, key->len + 1);
2175
2176         bt_putid(set->page->right, right->page_no);
2177         set->page->min = nxt;
2178         set->page->cnt = idx;
2179
2180         // if current page is the root page, split it
2181
2182         if( set->page_no == ROOT_page )
2183                 return bt_splitroot (bt, set, fencekey, right->page_no);
2184
2185         // insert new fences in their parent pages
2186
2187         right->latch = bt_pinlatch (bt, right->page_no);
2188         bt_lockpage (BtLockParent, right->latch);
2189
2190         bt_lockpage (BtLockParent, set->latch);
2191         bt_unlockpage (BtLockWrite, set->latch);
2192
2193         // insert new fence for reformulated left block of smaller keys
2194
2195         bt_putid (value, set->page_no);
2196
2197         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 1) )
2198                 return bt->err;
2199
2200         // switch fence for right block of larger keys to new right page
2201
2202         bt_putid (value, right->page_no);
2203
2204         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, 1) )
2205                 return bt->err;
2206
2207         bt_unlockpage (BtLockParent, set->latch);
2208         bt_unpinlatch (set->latch);
2209         bt_unpinpool (set->pool);
2210
2211         bt_unlockpage (BtLockParent, right->latch);
2212         bt_unpinlatch (right->latch);
2213         return 0;
2214 }
2215
2216 //      install new key and value onto page
2217 //      page must already be checked for
2218 //      adequate space
2219
2220 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2221 {
2222 uint idx, librarian;
2223 BtSlot *node;
2224
2225         //      if found slot > desired slot and previous slot
2226         //      is a librarian slot, use it
2227
2228         if( slot > 1 )
2229           if( slotptr(set->page, slot-1)->type == Librarian )
2230                 slot--;
2231
2232         // copy value onto page
2233
2234         set->page->min -= vallen + 1; // reset lowest used offset
2235         ((unsigned char *)set->page)[set->page->min] = vallen;
2236         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2237
2238         // copy key onto page
2239
2240         set->page->min -= keylen + 1; // reset lowest used offset
2241         ((unsigned char *)set->page)[set->page->min] = keylen;
2242         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2243
2244         //      find first empty slot
2245
2246         for( idx = slot; idx < set->page->cnt; idx++ )
2247           if( slotptr(set->page, idx)->dead )
2248                 break;
2249
2250         // now insert key into array before slot
2251
2252         if( idx == set->page->cnt )
2253                 idx += 2, set->page->cnt += 2, librarian = 2;
2254         else
2255                 librarian = 1;
2256
2257         set->page->act++;
2258
2259         while( idx > slot + librarian - 1 )
2260                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2261
2262         //      add librarian slot
2263
2264         if( librarian > 1 ) {
2265                 node = slotptr(set->page, slot++);
2266                 node->off = set->page->min;
2267                 node->type = Librarian;
2268                 node->dead = 1;
2269         }
2270
2271         //      fill in new slot
2272
2273         node = slotptr(set->page, slot);
2274         node->off = set->page->min;
2275         node->type = type;
2276         node->dead = 0;
2277
2278         bt_unlockpage (BtLockWrite, set->latch);
2279         bt_unpinlatch (set->latch);
2280         bt_unpinpool (set->pool);
2281         return 0;
2282 }
2283
2284 //  Insert new key into the btree at given level.
2285 //      either add a new key or update/add an existing one
2286
2287 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2288 {
2289 unsigned char newkey[256];
2290 uint slot, idx, len;
2291 BtPageSet set[1];
2292 uid sequence;
2293 BtKey ptr;
2294 BtVal val;
2295 uint type;
2296
2297   // set up the key we're working on
2298
2299   memcpy (newkey + 1, key, keylen);
2300   newkey[0] = keylen;
2301
2302   // is this a non-unique index value?
2303
2304   if( unique )
2305         type = Unique;
2306   else {
2307         type = Duplicate;
2308         sequence = bt_newdup (bt);
2309         bt_putid (newkey + *newkey + 1, sequence);
2310         *newkey += BtId;
2311   }
2312
2313   while( 1 ) { // find the page and slot for the current key
2314         if( slot = bt_loadpage (bt, set, newkey + 1, *newkey, lvl, BtLockWrite) )
2315                 ptr = keyptr(set->page, slot);
2316         else {
2317                 if( !bt->err )
2318                         bt->err = BTERR_ovflw;
2319                 return bt->err;
2320         }
2321
2322         // if librarian slot == found slot, advance to real slot
2323
2324         if( slotptr(set->page, slot)->type == Librarian )
2325           if( !keycmp (ptr, key, keylen) )
2326                 ptr = keyptr(set->page, ++slot);
2327
2328         len = ptr->len;
2329
2330         if( slotptr(set->page, slot)->type == Duplicate )
2331                 len -= BtId;
2332
2333         //  if inserting a duplicate key or unique key
2334         //      check for adequate space on the page
2335         //      and insert the new key before slot.
2336
2337         if( unique && (len != *newkey || memcmp (ptr->key, newkey+1, *newkey)) || !unique ) {
2338           if( !(slot = bt_cleanpage (bt, set->page, *newkey, slot, vallen)) )
2339                 if( bt_splitpage (bt, set) )
2340                   return bt->err;
2341                 else
2342                   continue;
2343
2344           return bt_insertslot (bt, set, slot, newkey + 1, *newkey, value, vallen, type);
2345         }
2346
2347         // if key already exists, update value and return
2348
2349         if( val = valptr(set->page, slot), val->len >= vallen ) {
2350                 if( slotptr(set->page, slot)->dead )
2351                         set->page->act++;
2352                 set->page->garbage += val->len - vallen;
2353                 slotptr(set->page, slot)->dead = 0;
2354                 val->len = vallen;
2355                 memcpy (val->value, value, vallen);
2356                 bt_unlockpage(BtLockWrite, set->latch);
2357                 bt_unpinlatch (set->latch);
2358                 bt_unpinpool (set->pool);
2359                 return 0;
2360         }
2361
2362         //      new update value doesn't fit in existing value area
2363
2364         if( !slotptr(set->page, slot)->dead )
2365                 set->page->garbage += val->len + ptr->len + 2;
2366         else {
2367                 slotptr(set->page, slot)->dead = 0;
2368                 set->page->act++;
2369         }
2370
2371         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2372           if( bt_splitpage (bt, set) )
2373                 return bt->err;
2374           else
2375                 continue;
2376
2377         set->page->min -= vallen + 1;
2378         ((unsigned char *)set->page)[set->page->min] = vallen;
2379         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2380
2381         set->page->min -= keylen + 1;
2382         ((unsigned char *)set->page)[set->page->min] = keylen;
2383         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2384         
2385         slotptr(set->page, slot)->off = set->page->min;
2386         bt_unlockpage(BtLockWrite, set->latch);
2387         bt_unpinlatch (set->latch);
2388         bt_unpinpool (set->pool);
2389         return 0;
2390   }
2391   return 0;
2392 }
2393
2394 //  return next slot for cursor page
2395 //  or slide cursor right into next page
2396
2397 uint bt_nextkey (BtDb *bt, uint slot)
2398 {
2399 BtPageSet set[1];
2400 uid right;
2401
2402   do {
2403         right = bt_getid(bt->cursor->right);
2404
2405         while( slot++ < bt->cursor->cnt )
2406           if( slotptr(bt->cursor,slot)->dead )
2407                 continue;
2408           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2409                 return slot;
2410           else
2411                 break;
2412
2413         if( !right )
2414                 break;
2415
2416         bt->cursor_page = right;
2417
2418         if( set->pool = bt_pinpool (bt, right) )
2419                 set->page = bt_page (bt, set->pool, right);
2420         else
2421                 return 0;
2422
2423         set->latch = bt_pinlatch (bt, right);
2424     bt_lockpage(BtLockRead, set->latch);
2425
2426         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2427
2428         bt_unlockpage(BtLockRead, set->latch);
2429         bt_unpinlatch (set->latch);
2430         bt_unpinpool (set->pool);
2431         slot = 0;
2432
2433   } while( 1 );
2434
2435   return bt->err = 0;
2436 }
2437
2438 //  cache page of keys into cursor and return starting slot for given key
2439
2440 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2441 {
2442 BtPageSet set[1];
2443 uint slot;
2444
2445         // cache page for retrieval
2446
2447         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2448           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2449         else
2450           return 0;
2451
2452         bt->cursor_page = set->page_no;
2453
2454         bt_unlockpage(BtLockRead, set->latch);
2455         bt_unpinlatch (set->latch);
2456         bt_unpinpool (set->pool);
2457         return slot;
2458 }
2459
2460 BtKey bt_key(BtDb *bt, uint slot)
2461 {
2462         return keyptr(bt->cursor, slot);
2463 }
2464
2465 BtVal bt_val(BtDb *bt, uint slot)
2466 {
2467         return valptr(bt->cursor,slot);
2468 }
2469
2470 #ifdef STANDALONE
2471
2472 #ifndef unix
2473 double getCpuTime(int type)
2474 {
2475 FILETIME crtime[1];
2476 FILETIME xittime[1];
2477 FILETIME systime[1];
2478 FILETIME usrtime[1];
2479 SYSTEMTIME timeconv[1];
2480 double ans = 0;
2481
2482         memset (timeconv, 0, sizeof(SYSTEMTIME));
2483
2484         switch( type ) {
2485         case 0:
2486                 GetSystemTimeAsFileTime (xittime);
2487                 FileTimeToSystemTime (xittime, timeconv);
2488                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2489                 break;
2490         case 1:
2491                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2492                 FileTimeToSystemTime (usrtime, timeconv);
2493                 break;
2494         case 2:
2495                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2496                 FileTimeToSystemTime (systime, timeconv);
2497                 break;
2498         }
2499
2500         ans += (double)timeconv->wHour * 3600;
2501         ans += (double)timeconv->wMinute * 60;
2502         ans += (double)timeconv->wSecond;
2503         ans += (double)timeconv->wMilliseconds / 1000;
2504         return ans;
2505 }
2506 #else
2507 #include <time.h>
2508 #include <sys/resource.h>
2509
2510 double getCpuTime(int type)
2511 {
2512 struct rusage used[1];
2513 struct timeval tv[1];
2514
2515         switch( type ) {
2516         case 0:
2517                 gettimeofday(tv, NULL);
2518                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2519
2520         case 1:
2521                 getrusage(RUSAGE_SELF, used);
2522                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2523
2524         case 2:
2525                 getrusage(RUSAGE_SELF, used);
2526                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2527         }
2528
2529         return 0;
2530 }
2531 #endif
2532
2533 uint bt_latchaudit (BtDb *bt)
2534 {
2535 ushort idx, hashidx;
2536 uid next, page_no;
2537 BtLatchSet *latch;
2538 uint cnt = 0;
2539 BtKey ptr;
2540
2541 #ifdef unix
2542         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2543 #endif
2544         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2545                 fprintf(stderr, "Alloc page locked\n");
2546         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2547
2548         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2549                 latch = bt->mgr->latchsets + idx;
2550                 if( *latch->readwr->rin & MASK )
2551                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2552                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2553
2554                 if( *latch->access->rin & MASK )
2555                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2556                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2557
2558                 if( *latch->parent->rin & MASK )
2559                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2560                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2561
2562                 if( latch->pin ) {
2563                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2564                         latch->pin = 0;
2565                 }
2566         }
2567
2568         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2569           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2570                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2571
2572           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2573
2574           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2575                 latch = bt->mgr->latchsets + idx;
2576                 if( *(ushort *)latch->busy )
2577                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2578                 *(ushort *)latch->busy = 0;
2579                 if( latch->pin )
2580                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2581           } while( idx = latch->next );
2582         }
2583
2584         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2585         page_no = LEAF_page;
2586
2587         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2588         off64_t off = page_no << bt->mgr->page_bits;
2589 #ifdef unix
2590                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2591 #else
2592                 DWORD amt[1];
2593
2594                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2595
2596                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2597                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2598
2599                   if( *amt <  bt->mgr->page_size )
2600                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2601 #endif
2602                 if( !bt->frame->free ) {
2603                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2604                   ptr = keyptr(bt->frame, idx+1);
2605                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2606                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2607                  }
2608                  if( !bt->frame->lvl )
2609                         cnt += bt->frame->act;
2610                 }
2611                 if( page_no > LEAF_page )
2612                         next = page_no + 1;
2613                 page_no = next;
2614         }
2615         return cnt - 1;
2616 }
2617
2618 typedef struct {
2619         char idx;
2620         char *type;
2621         char *infile;
2622         BtMgr *mgr;
2623         int num;
2624 } ThreadArg;
2625
2626 //  standalone program to index file of keys
2627 //  then list them onto std-out
2628
2629 #ifdef unix
2630 void *index_file (void *arg)
2631 #else
2632 uint __stdcall index_file (void *arg)
2633 #endif
2634 {
2635 int line = 0, found = 0, cnt = 0;
2636 uid next, page_no = LEAF_page;  // start on first page of leaves
2637 unsigned char key[256];
2638 ThreadArg *args = arg;
2639 int ch, len = 0, slot;
2640 BtPageSet set[1];
2641 BtKey ptr;
2642 BtDb *bt;
2643 FILE *in;
2644
2645         bt = bt_open (args->mgr);
2646
2647         switch(args->type[0] | 0x20)
2648         {
2649         case 'a':
2650                 fprintf(stderr, "started latch mgr audit\n");
2651                 cnt = bt_latchaudit (bt);
2652                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2653                 break;
2654
2655         case 'p':
2656                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2657                 if( in = fopen (args->infile, "rb") )
2658                   while( ch = getc(in), ch != EOF )
2659                         if( ch == '\n' )
2660                         {
2661                           line++;
2662
2663                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
2664                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2665                           len = 0;
2666                         }
2667                         else if( len < 255 )
2668                                 key[len++] = ch;
2669                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2670                 break;
2671
2672         case 'w':
2673                 fprintf(stderr, "started indexing for %s\n", args->infile);
2674                 if( in = fopen (args->infile, "rb") )
2675                   while( ch = getc(in), ch != EOF )
2676                         if( ch == '\n' )
2677                         {
2678                           line++;
2679
2680                           if( args->num == 1 )
2681                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2682
2683                           else if( args->num )
2684                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2685
2686                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
2687                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2688                           len = 0;
2689                         }
2690                         else if( len < 255 )
2691                                 key[len++] = ch;
2692                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2693                 break;
2694
2695         case 'd':
2696                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2697                 if( in = fopen (args->infile, "rb") )
2698                   while( ch = getc(in), ch != EOF )
2699                         if( ch == '\n' )
2700                         {
2701                           line++;
2702                           if( args->num == 1 )
2703                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2704
2705                           else if( args->num )
2706                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2707
2708                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2709                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2710                           ptr = (BtKey)(bt->key);
2711
2712                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2713                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2714                           found += bt->found;
2715                           len = 0;
2716                         }
2717                         else if( len < 255 )
2718                                 key[len++] = ch;
2719                 fprintf(stderr, "finished %s for %d keys, %d found\n", args->infile, line, found);
2720                 break;
2721
2722         case 'f':
2723                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2724                 if( in = fopen (args->infile, "rb") )
2725                   while( ch = getc(in), ch != EOF )
2726                         if( ch == '\n' )
2727                         {
2728                           line++;
2729                           if( args->num == 1 )
2730                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2731
2732                           else if( args->num )
2733                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2734
2735                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2736                                 found++;
2737                           else if( bt->err )
2738                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2739                           len = 0;
2740                         }
2741                         else if( len < 255 )
2742                                 key[len++] = ch;
2743                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2744                 break;
2745
2746         case 's':
2747                 fprintf(stderr, "started scanning\n");
2748                 do {
2749                         if( set->pool = bt_pinpool (bt, page_no) )
2750                                 set->page = bt_page (bt, set->pool, page_no);
2751                         else
2752                                 break;
2753                         set->latch = bt_pinlatch (bt, page_no);
2754                         bt_lockpage (BtLockRead, set->latch);
2755                         next = bt_getid (set->page->right);
2756
2757                         for( slot = 0; slot++ < set->page->cnt; )
2758                          if( next || slot < set->page->cnt )
2759                           if( !slotptr(set->page, slot)->dead ) {
2760                                 ptr = keyptr(set->page, slot);
2761                                 len = ptr->len;
2762
2763                             if( slotptr(set->page, slot)->type == Duplicate )
2764                                         len -= BtId;
2765
2766                                 fwrite (ptr->key, len, 1, stdout);
2767                                 fputc ('\n', stdout);
2768                                 cnt++;
2769                            }
2770
2771                         bt_unlockpage (BtLockRead, set->latch);
2772                         bt_unpinlatch (set->latch);
2773                         bt_unpinpool (set->pool);
2774                 } while( page_no = next );
2775
2776                 fprintf(stderr, " Total keys read %d\n", cnt);
2777                 break;
2778
2779         case 'c':
2780 #ifdef unix
2781                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2782 #endif
2783                 fprintf(stderr, "started counting\n");
2784                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2785                 page_no = LEAF_page;
2786
2787                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2788                 uid off = page_no << bt->mgr->page_bits;
2789 #ifdef unix
2790                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2791 #else
2792                 DWORD amt[1];
2793
2794                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2795
2796                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2797                         return bt->err = BTERR_map;
2798
2799                   if( *amt <  bt->mgr->page_size )
2800                         return bt->err = BTERR_map;
2801 #endif
2802                         if( !bt->frame->free && !bt->frame->lvl )
2803                                 cnt += bt->frame->act;
2804                         if( page_no > LEAF_page )
2805                                 next = page_no + 1;
2806                         page_no = next;
2807                 }
2808                 
2809                 cnt--;  // remove stopper key
2810                 fprintf(stderr, " Total keys read %d\n", cnt);
2811                 break;
2812         }
2813
2814         bt_close (bt);
2815 #ifdef unix
2816         return NULL;
2817 #else
2818         return 0;
2819 #endif
2820 }
2821
2822 typedef struct timeval timer;
2823
2824 int main (int argc, char **argv)
2825 {
2826 int idx, cnt, len, slot, err;
2827 int segsize, bits = 16;
2828 double start, stop;
2829 #ifdef unix
2830 pthread_t *threads;
2831 #else
2832 HANDLE *threads;
2833 #endif
2834 ThreadArg *args;
2835 uint poolsize = 0;
2836 float elapsed;
2837 int num = 0;
2838 char key[1];
2839 BtMgr *mgr;
2840 BtKey ptr;
2841 BtDb *bt;
2842
2843         if( argc < 3 ) {
2844                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2845                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2846                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2847                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2848                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2849                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2850                 exit(0);
2851         }
2852
2853         start = getCpuTime(0);
2854
2855         if( argc > 3 )
2856                 bits = atoi(argv[3]);
2857
2858         if( argc > 4 )
2859                 poolsize = atoi(argv[4]);
2860
2861         if( !poolsize )
2862                 fprintf (stderr, "Warning: no mapped_pool\n");
2863
2864         if( poolsize > 65535 )
2865                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2866
2867         if( argc > 5 )
2868                 segsize = atoi(argv[5]);
2869         else
2870                 segsize = 4;    // 16 pages per mmap segment
2871
2872         if( argc > 6 )
2873                 num = atoi(argv[6]);
2874
2875         cnt = argc - 7;
2876 #ifdef unix
2877         threads = malloc (cnt * sizeof(pthread_t));
2878 #else
2879         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2880 #endif
2881         args = malloc (cnt * sizeof(ThreadArg));
2882
2883         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2884
2885         if( !mgr ) {
2886                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2887                 exit (1);
2888         }
2889
2890         //      fire off threads
2891
2892         for( idx = 0; idx < cnt; idx++ ) {
2893                 args[idx].infile = argv[idx + 7];
2894                 args[idx].type = argv[2];
2895                 args[idx].mgr = mgr;
2896                 args[idx].num = num;
2897                 args[idx].idx = idx;
2898 #ifdef unix
2899                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2900                         fprintf(stderr, "Error creating thread %d\n", err);
2901 #else
2902                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2903 #endif
2904         }
2905
2906         //      wait for termination
2907
2908 #ifdef unix
2909         for( idx = 0; idx < cnt; idx++ )
2910                 pthread_join (threads[idx], NULL);
2911 #else
2912         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2913
2914         for( idx = 0; idx < cnt; idx++ )
2915                 CloseHandle(threads[idx]);
2916
2917 #endif
2918         elapsed = getCpuTime(0) - start;
2919         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2920         elapsed = getCpuTime(1);
2921         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2922         elapsed = getCpuTime(2);
2923         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2924
2925         bt_mgrclose (mgr);
2926 }
2927
2928 #endif  //STANDALONE