]> pd.if.org Git - btree/blob - threads2j.c
Finish rework of bt_deletekey, w/bug fixes
[btree] / threads2j.c
1 // btree version threads2j linux futex concurrency version
2 //      with reworked bt_deletekey
3 // 12 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #include <linux/futex.h>
30 #define SYS_futex 202
31 #endif
32
33 #ifdef unix
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <fcntl.h>
38 #include <sys/time.h>
39 #include <sys/mman.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <limits.h>
43 #else
44 #define WIN32_LEAN_AND_MEAN
45 #include <windows.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <time.h>
49 #include <fcntl.h>
50 #include <process.h>
51 #include <intrin.h>
52 #endif
53
54 #include <memory.h>
55 #include <string.h>
56 #include <stddef.h>
57
58 typedef unsigned long long      uid;
59
60 #ifndef unix
61 typedef unsigned long long      off64_t;
62 typedef unsigned short          ushort;
63 typedef unsigned int            uint;
64 #endif
65
66 #define BT_ro 0x6f72    // ro
67 #define BT_rw 0x7772    // rw
68
69 #define BT_latchtable   128                                     // number of latch manager slots
70
71 #define BT_maxbits              24                                      // maximum page size in bits
72 #define BT_minbits              9                                       // minimum page size in bits
73 #define BT_minpage              (1 << BT_minbits)       // minimum page size
74 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
75
76 /*
77 There are five lock types for each node in three independent sets: 
78 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
79 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
80 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
81 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
82 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
83 */
84
85 typedef enum{
86         BtLockAccess,
87         BtLockDelete,
88         BtLockRead,
89         BtLockWrite,
90         BtLockParent
91 } BtLock;
92
93 //      mode & definition for latch implementation
94
95 enum {
96         Mutex = 1 << 0,         // the mutex bit
97         Write = 1 << 1,         // the writers bit
98         Share = 1 << 2,         // reader count
99         PendRd = 1 << 12,       // reader contended count
100         PendWr = 1 << 22        // writer contended count
101 } LockMode;
102
103 enum {
104         QueRd = 1,      // reader queue
105         QueWr = 2       // writer queue
106 } RWQueue;
107
108 // share is count of read accessors
109 // grant write lock when share == 0
110
111 typedef struct {
112         volatile uint mutex:1;          // 1 = busy
113         volatile uint write:1;          // 1 = exclusive
114         volatile uint share:10;         // count of readers holding locks
115         volatile uint readwait:10;      // count of readers waiting
116         volatile uint writewait:10;     // count of writers waiting
117 } BtLatch;
118
119 //      Define the length of the page and key pointers
120
121 #define BtId 6
122
123 //      Page key slot definition.
124
125 //      If BT_maxbits is 15 or less, you can save 4 bytes
126 //      for each key stored by making the first two uints
127 //      into ushorts.  You can also save 4 bytes by removing
128 //      the tod field from the key.
129
130 //      Keys are marked dead, but remain on the page until
131 //      it cleanup is called. The fence key (highest key) for
132 //      the page is always present, even after cleanup.
133
134 typedef struct {
135         uint off:BT_maxbits;            // page offset for key start
136         uint dead:1;                            // set for deleted key
137         uint tod;                                       // time-stamp for key
138         unsigned char id[BtId];         // id associated with key
139 } BtSlot;
140
141 //      The key structure occupies space at the upper end of
142 //      each page.  It's a length byte followed by the value
143 //      bytes.
144
145 typedef struct {
146         unsigned char len;
147         unsigned char key[1];
148 } *BtKey;
149
150 //      The first part of an index page.
151 //      It is immediately followed
152 //      by the BtSlot array of keys.
153
154 typedef struct BtPage_ {
155         uint cnt;                                       // count of keys in page
156         uint act;                                       // count of active keys
157         uint min;                                       // next key offset
158         unsigned char bits:7;           // page size in bits
159         unsigned char free:1;           // page is on free list
160         unsigned char lvl:4;            // level of page
161         unsigned char kill:1;           // page is being deleted
162         unsigned char dirty:1;          // page has deleted keys
163         unsigned char posted:1;         // page fence has posted
164         unsigned char goright:1;        // page is being killed, continue to right
165         unsigned char right[BtId];      // page number to right
166         unsigned char fence[256];       // page fence key
167 } *BtPage;
168
169 //  hash table entries
170
171 typedef struct {
172         BtLatch latch[1];
173         volatile ushort slot;           // Latch table entry at head of chain
174 } BtHashEntry;
175
176 //      latch manager table structure
177
178 typedef struct {
179         BtLatch readwr[1];                      // read/write page lock
180         BtLatch access[1];                      // Access Intent/Page delete
181         BtLatch parent[1];                      // adoption of foster children
182         BtLatch busy[1];                        // slot is being moved between chains
183         volatile ushort next;           // next entry in hash table chain
184         volatile ushort prev;           // prev entry in hash table chain
185         volatile ushort pin;            // number of outstanding locks
186         volatile ushort hash;           // hash slot entry is under
187         volatile uid page_no;           // latch set page number
188 } BtLatchSet;
189
190 //      The memory mapping pool table buffer manager entry
191
192 typedef struct {
193         unsigned long long int lru;     // number of times accessed
194         uid  basepage;                          // mapped base page number
195         char *map;                                      // mapped memory pointer
196         ushort slot;                            // slot index in this array
197         ushort pin;                                     // mapped page pin counter
198         void *hashprev;                         // previous pool entry for the same hash idx
199         void *hashnext;                         // next pool entry for the same hash idx
200 #ifndef unix
201         HANDLE hmap;                            // Windows memory mapping handle
202 #endif
203 } BtPool;
204
205 //  The loadpage interface object
206
207 typedef struct {
208         uid page_no;            // current page number
209         BtPage page;            // current page pointer
210         BtPool *pool;           // current page pool
211         BtLatchSet *latch;      // current page latch set
212 } BtPageSet;
213
214 //      structure for latch manager on ALLOC_page
215
216 typedef struct {
217         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
218         BtLatch lock[1];                        // allocation area lite latch
219         ushort latchdeployed;           // highest number of latch entries deployed
220         ushort nlatchpage;                      // number of latch pages at BT_latch
221         ushort latchtotal;                      // number of page latch entries
222         ushort latchhash;                       // number of latch hash table slots
223         ushort latchvictim;                     // next latch entry to examine
224         BtHashEntry table[0];           // the hash table
225 } BtLatchMgr;
226
227 //      The object structure for Btree access
228
229 typedef struct {
230         uint page_size;                         // page size    
231         uint page_bits;                         // page size in bits    
232         uint seg_bits;                          // seg size in pages in bits
233         uint mode;                                      // read-write mode
234 #ifdef unix
235         int idx;
236 #else
237         HANDLE idx;
238 #endif
239         ushort poolcnt;                         // highest page pool node in use
240         ushort poolmax;                         // highest page pool node allocated
241         ushort poolmask;                        // total number of pages in mmap segment - 1
242         ushort evicted;                         // last evicted hash table slot
243         ushort hashsize;                        // size of Hash Table for pool entries
244         ushort *hash;                           // pool index for hash entries
245         BtLatch *latch;                         // latches for pool hash slots
246         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
247         BtLatchSet *latchsets;          // mapped latch set from latch pages
248         BtPool *pool;                           // memory pool page segments
249 #ifndef unix
250         HANDLE halloc;                          // allocation and latch table handle
251 #endif
252 } BtMgr;
253
254 typedef struct {
255         BtMgr *mgr;                     // buffer manager for thread
256         BtPage cursor;          // cached frame for start/next (never mapped)
257         BtPage frame;           // spare frame for the page split (never mapped)
258         BtPage zero;            // page of zeroes to extend the file (never mapped)
259         uid cursor_page;        // current cursor page number   
260         unsigned char *mem;     // frame, cursor, page memory buffer
261         int found;                      // last delete or insert was found
262         int err;                        // last error
263 } BtDb;
264
265 typedef enum {
266         BTERR_ok = 0,
267         BTERR_struct,
268         BTERR_ovflw,
269         BTERR_lock,
270         BTERR_map,
271         BTERR_wrt,
272         BTERR_hash
273 } BTERR;
274
275 // B-Tree functions
276 extern void bt_close (BtDb *bt);
277 extern BtDb *bt_open (BtMgr *mgr);
278 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
279 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
280 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
281 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
282 extern uint bt_nextkey   (BtDb *bt, uint slot);
283
284 //      internal functions
285 BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence);
286
287 //      manager functions
288 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
289 void bt_mgrclose (BtMgr *mgr);
290
291 //  Helper functions to return slot values
292
293 extern BtKey bt_key (BtDb *bt, uint slot);
294 extern uid bt_uid (BtDb *bt, uint slot);
295 extern uint bt_tod (BtDb *bt, uint slot);
296
297 //  BTree page number constants
298 #define ALLOC_page              0       // allocation & lock manager hash table
299 #define ROOT_page               1       // root of the btree
300 #define LEAF_page               2       // first page of leaves
301 #define LATCH_page              3       // pages for lock manager
302
303 //      Number of levels to create in a new BTree
304
305 #define MIN_lvl                 2
306
307 //  The page is allocated from low and hi ends.
308 //  The key offsets and row-id's are allocated
309 //  from the bottom, while the text of the key
310 //  is allocated from the top.  When the two
311 //  areas meet, the page is split into two.
312
313 //  A key consists of a length byte, two bytes of
314 //  index number (0 - 65534), and up to 253 bytes
315 //  of key value.  Duplicate keys are discarded.
316 //  Associated with each key is a 48 bit row-id.
317
318 //  The b-tree root is always located at page 1.
319 //      The first leaf page of level zero is always
320 //      located on page 2.
321
322 //      The b-tree pages are linked with next
323 //      pointers to facilitate enumerators,
324 //      and provide for concurrency.
325
326 //      When to root page fills, it is split in two and
327 //      the tree height is raised by a new root at page
328 //      one with two keys.
329
330 //      Deleted keys are marked with a dead bit until
331 //      page cleanup. The fence key for a node is
332 //      present in a special array.
333
334 //  Groups of pages called segments from the btree are optionally
335 //  cached with a memory mapped pool. A hash table is used to keep
336 //  track of the cached segments.  This behaviour is controlled
337 //  by the cache block size parameter to bt_open.
338
339 //  To achieve maximum concurrency one page is locked at a time
340 //  as the tree is traversed to find leaf key in question. The right
341 //  page numbers are used in cases where the page is being split,
342 //      or consolidated.
343
344 //  Page 0 is dedicated to lock for new page extensions,
345 //      and chains empty pages together for reuse.
346
347 //      The ParentModification lock on a node is obtained to serialize posting
348 //      or changing the fence key for a node.
349
350 //      Empty pages are chained together through the ALLOC page and reused.
351
352 //      Access macros to address slot and key values from the page
353 //      Page slots use 1 based indexing.
354
355 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
356 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
357
358 void bt_putid(unsigned char *dest, uid id)
359 {
360 int i = BtId;
361
362         while( i-- )
363                 dest[i] = (unsigned char)id, id >>= 8;
364 }
365
366 uid bt_getid(unsigned char *src)
367 {
368 uid id = 0;
369 int i;
370
371         for( i = 0; i < BtId; i++ )
372                 id <<= 8, id |= *src++; 
373
374         return id;
375 }
376
377 //      Latch Manager
378
379 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
380 {
381         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
382 }
383
384 //      wait until write lock mode is clear
385 //      and add 1 to the share count
386
387 void bt_spinreadlock(BtLatch *latch, int private)
388 {
389 uint prev;
390
391   if( private )
392         private = FUTEX_PRIVATE_FLAG;
393
394   while( 1 ) {
395         //      obtain latch mutex
396         if( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex ) {
397                 sched_yield();
398                 continue;
399         }
400
401         //  wait for writers to clear
402         //      increment read waiters and wait
403
404         if( latch->write || latch->writewait ) {
405                 __sync_fetch_and_add ((uint *)latch, PendRd);
406                 prev = __sync_fetch_and_and ((uint *)latch, ~Mutex) & ~Mutex;
407                 sys_futex( (uint *)latch, FUTEX_WAIT_BITSET | private, prev, NULL, NULL, QueRd );
408                 __sync_fetch_and_sub ((uint *)latch, PendRd);
409                 continue;
410         }
411         
412         // increment reader lock count
413         // and release latch mutex
414
415         __sync_fetch_and_add ((uint *)latch, Share);
416         __sync_fetch_and_and ((uint *)latch, ~Mutex);
417         return;
418   }
419 }
420
421 //      wait for other read and write latches to relinquish
422
423 void bt_spinwritelock(BtLatch *latch, int private)
424 {
425 uint prev;
426
427   if( private )
428         private = FUTEX_PRIVATE_FLAG;
429
430   while( 1 ) {
431         //      obtain latch mutex
432         if( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex ) {
433                 sched_yield();
434                 continue;
435         }
436
437         //      wait for write and reader count to clear
438
439         if( latch->write || latch->share ) {
440                 __sync_fetch_and_add ((uint *)latch, PendWr);
441                 prev = __sync_fetch_and_and ((uint *)latch, ~Mutex) & ~Mutex;
442                 sys_futex( (uint *)latch, FUTEX_WAIT_BITSET | private, prev, NULL, NULL, QueWr );
443                 __sync_fetch_and_sub ((uint *)latch, PendWr);
444                 continue;
445         }
446         
447         //      take write mutex
448         //      release latch mutex
449
450         __sync_fetch_and_or ((uint *)latch, Write);
451         __sync_fetch_and_and ((uint *)latch, ~Mutex);
452         return;
453   }
454 }
455
456 //      try to obtain write lock
457
458 //      return 1 if obtained,
459 //              0 otherwise
460
461 int bt_spinwritetry(BtLatch *latch)
462 {
463 int ans;
464
465         //      try for mutex,
466         //      abandon request if not taken
467
468         if( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex )
469                 return 0;
470
471         //      see if write mode is available
472
473         if( !latch->write && !latch->share ) {
474                 __sync_fetch_and_or ((uint *)latch, Write);
475                 ans = 1;
476         } else
477                 ans = 0;
478
479         // release latch mutex
480
481         __sync_fetch_and_and ((uint *)latch, ~Mutex);
482         return ans;
483 }
484
485 //      clear write lock
486
487 void bt_spinreleasewrite(BtLatch *latch, int private)
488 {
489   if( private )
490         private = FUTEX_PRIVATE_FLAG;
491
492         //      obtain latch mutex
493
494         while( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex )
495                 sched_yield();
496
497         __sync_fetch_and_and ((uint *)latch, ~Write);
498
499         // favor writers
500
501         if( latch->writewait )
502           if( sys_futex( (uint *)latch, FUTEX_WAKE_BITSET | private, 1, NULL, NULL, QueWr ) )
503                 goto wakexit;
504
505         if( latch->readwait )
506                 sys_futex( (uint *)latch, FUTEX_WAKE_BITSET | private, INT_MAX, NULL, NULL, QueRd );
507
508         // release latch mutex
509
510 wakexit:
511         __sync_fetch_and_and ((uint *)latch, ~Mutex);
512 }
513
514 //      decrement reader count
515
516 void bt_spinreleaseread(BtLatch *latch, int private)
517 {
518   if( private )
519         private = FUTEX_PRIVATE_FLAG;
520
521         //      obtain latch mutex
522
523         while( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex )
524                 sched_yield();
525
526         __sync_fetch_and_sub ((uint *)latch, Share);
527
528         // wake waiting writers
529
530         if( !latch->share && latch->writewait )
531                 sys_futex( (uint *)latch, FUTEX_WAKE_BITSET | private, 1, NULL, NULL, QueWr );
532
533         // release latch mutex
534
535         __sync_fetch_and_and ((uint *)latch, ~Mutex);
536 }
537
538 //      link latch table entry into latch hash table
539
540 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
541 {
542 BtLatchSet *set = bt->mgr->latchsets + victim;
543
544         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
545                 bt->mgr->latchsets[set->next].prev = victim;
546
547         bt->mgr->latchmgr->table[hashidx].slot = victim;
548         set->page_no = page_no;
549         set->hash = hashidx;
550         set->prev = 0;
551 }
552
553 //      release latch pin
554
555 void bt_unpinlatch (BtLatchSet *set)
556 {
557 #ifdef unix
558         __sync_fetch_and_add(&set->pin, -1);
559 #else
560         _InterlockedDecrement16 (&set->pin);
561 #endif
562 }
563
564 //      find existing latchset or inspire new one
565 //      return with latchset pinned
566
567 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
568 {
569 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
570 ushort slot, avail = 0, victim, idx;
571 BtLatchSet *set;
572
573         //  obtain read lock on hash table entry
574
575         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch, 0);
576
577         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
578         {
579                 set = bt->mgr->latchsets + slot;
580                 if( page_no == set->page_no )
581                         break;
582         } while( slot = set->next );
583
584         if( slot ) {
585 #ifdef unix
586                 __sync_fetch_and_add(&set->pin, 1);
587 #else
588                 _InterlockedIncrement16 (&set->pin);
589 #endif
590         }
591
592     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch, 0);
593
594         if( slot )
595                 return set;
596
597   //  try again, this time with write lock
598
599   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch, 0);
600
601   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
602   {
603         set = bt->mgr->latchsets + slot;
604         if( page_no == set->page_no )
605                 break;
606         if( !set->pin && !avail )
607                 avail = slot;
608   } while( slot = set->next );
609
610   //  found our entry, or take over an unpinned one
611
612   if( slot || (slot = avail) ) {
613         set = bt->mgr->latchsets + slot;
614 #ifdef unix
615         __sync_fetch_and_add(&set->pin, 1);
616 #else
617         _InterlockedIncrement16 (&set->pin);
618 #endif
619         set->page_no = page_no;
620         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch, 0);
621         return set;
622   }
623
624         //  see if there are any unused entries
625 #ifdef unix
626         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
627 #else
628         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
629 #endif
630
631         if( victim < bt->mgr->latchmgr->latchtotal ) {
632                 set = bt->mgr->latchsets + victim;
633 #ifdef unix
634                 __sync_fetch_and_add(&set->pin, 1);
635 #else
636                 _InterlockedIncrement16 (&set->pin);
637 #endif
638                 bt_latchlink (bt, hashidx, victim, page_no);
639                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch, 0);
640                 return set;
641         }
642
643 #ifdef unix
644         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
645 #else
646         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
647 #endif
648   //  find and reuse previous lock entry
649
650   while( 1 ) {
651 #ifdef unix
652         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
653 #else
654         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
655 #endif
656         //      we don't use slot zero
657
658         if( victim %= bt->mgr->latchmgr->latchtotal )
659                 set = bt->mgr->latchsets + victim;
660         else
661                 continue;
662
663         //      take control of our slot
664         //      from other threads
665
666         if( set->pin || !bt_spinwritetry (set->busy) )
667                 continue;
668
669         idx = set->hash;
670
671         // try to get write lock on hash chain
672         //      skip entry if not obtained
673         //      or has outstanding locks
674
675         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
676                 bt_spinreleasewrite (set->busy, 0);
677                 continue;
678         }
679
680         if( set->pin ) {
681                 bt_spinreleasewrite (set->busy, 0);
682                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch, 0);
683                 continue;
684         }
685
686         //  unlink our available victim from its hash chain
687
688         if( set->prev )
689                 bt->mgr->latchsets[set->prev].next = set->next;
690         else
691                 bt->mgr->latchmgr->table[idx].slot = set->next;
692
693         if( set->next )
694                 bt->mgr->latchsets[set->next].prev = set->prev;
695
696         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch, 0);
697 #ifdef unix
698         __sync_fetch_and_add(&set->pin, 1);
699 #else
700         _InterlockedIncrement16 (&set->pin);
701 #endif
702         bt_latchlink (bt, hashidx, victim, page_no);
703         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch, 0);
704         bt_spinreleasewrite (set->busy, 0);
705         return set;
706   }
707 }
708
709 void bt_mgrclose (BtMgr *mgr)
710 {
711 BtPool *pool;
712 uint slot;
713
714         // release mapped pages
715         //      note that slot zero is never used
716
717         for( slot = 1; slot < mgr->poolmax; slot++ ) {
718                 pool = mgr->pool + slot;
719                 if( pool->slot )
720 #ifdef unix
721                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
722 #else
723                 {
724                         FlushViewOfFile(pool->map, 0);
725                         UnmapViewOfFile(pool->map);
726                         CloseHandle(pool->hmap);
727                 }
728 #endif
729         }
730
731 #ifdef unix
732         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
733         munmap (mgr->latchmgr, mgr->page_size);
734 #else
735         FlushViewOfFile(mgr->latchmgr, 0);
736         UnmapViewOfFile(mgr->latchmgr);
737         CloseHandle(mgr->halloc);
738 #endif
739 #ifdef unix
740         close (mgr->idx);
741         free (mgr->pool);
742         free (mgr->hash);
743         free (mgr->latch);
744         free (mgr);
745 #else
746         FlushFileBuffers(mgr->idx);
747         CloseHandle(mgr->idx);
748         GlobalFree (mgr->pool);
749         GlobalFree (mgr->hash);
750         GlobalFree (mgr->latch);
751         GlobalFree (mgr);
752 #endif
753 }
754
755 //      close and release memory
756
757 void bt_close (BtDb *bt)
758 {
759 #ifdef unix
760         if ( bt->mem )
761                 free (bt->mem);
762 #else
763         if ( bt->mem)
764                 VirtualFree (bt->mem, 0, MEM_RELEASE);
765 #endif
766         free (bt);
767 }
768
769 //  open/create new btree buffer manager
770
771 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
772 //              size of mapped page pool (e.g. 8192)
773
774 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
775 {
776 uint lvl, attr, cacheblk, last, slot, idx;
777 uint nlatchpage, latchhash;
778 BtLatchMgr *latchmgr;
779 off64_t size;
780 uint amt[1];
781 BtMgr* mgr;
782 BtKey key;
783 int flag;
784 #ifndef unix
785 SYSTEM_INFO sysinfo[1];
786 #endif
787
788         // determine sanity of page size and buffer pool
789
790         if( bits > BT_maxbits )
791                 bits = BT_maxbits;
792         else if( bits < BT_minbits )
793                 bits = BT_minbits;
794
795         if( !poolmax )
796                 return NULL;    // must have buffer pool
797
798 #ifdef unix
799         mgr = calloc (1, sizeof(BtMgr));
800         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
801
802         if( mgr->idx == -1 )
803                 return free(mgr), NULL;
804         
805         cacheblk = 4096;        // minimum mmap segment size for unix
806
807 #else
808         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
809         attr = FILE_ATTRIBUTE_NORMAL;
810         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
811
812         if( mgr->idx == INVALID_HANDLE_VALUE )
813                 return GlobalFree(mgr), NULL;
814
815         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
816         GetSystemInfo(sysinfo);
817         cacheblk = sysinfo->dwAllocationGranularity;
818 #endif
819
820 #ifdef unix
821         latchmgr = malloc (BT_maxpage);
822         *amt = 0;
823
824         // read minimum page size to get root info
825
826         if( size = lseek (mgr->idx, 0L, 2) ) {
827                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
828                         bits = latchmgr->alloc->bits;
829                 else
830                         return free(mgr), free(latchmgr), NULL;
831         } else if( mode == BT_ro )
832                 return free(latchmgr), free (mgr), NULL;
833 #else
834         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
835         size = GetFileSize(mgr->idx, amt);
836
837         if( size || *amt ) {
838                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
839                         return bt_mgrclose (mgr), NULL;
840                 bits = latchmgr->alloc->bits;
841         } else if( mode == BT_ro )
842                 return bt_mgrclose (mgr), NULL;
843 #endif
844
845         mgr->page_size = 1 << bits;
846         mgr->page_bits = bits;
847
848         mgr->poolmax = poolmax;
849         mgr->mode = mode;
850
851         if( cacheblk < mgr->page_size )
852                 cacheblk = mgr->page_size;
853
854         //  mask for partial memmaps
855
856         mgr->poolmask = (cacheblk >> bits) - 1;
857
858         //      see if requested size of pages per memmap is greater
859
860         if( (1 << segsize) > mgr->poolmask )
861                 mgr->poolmask = (1 << segsize) - 1;
862
863         mgr->seg_bits = 0;
864
865         while( (1 << mgr->seg_bits) <= mgr->poolmask )
866                 mgr->seg_bits++;
867
868         mgr->hashsize = hashsize;
869
870 #ifdef unix
871         mgr->pool = calloc (poolmax, sizeof(BtPool));
872         mgr->hash = calloc (hashsize, sizeof(ushort));
873         mgr->latch = calloc (hashsize, sizeof(BtLatch));
874 #else
875         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
876         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
877         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtLatch));
878 #endif
879
880         if( size || *amt )
881                 goto mgrlatch;
882
883         // initialize an empty b-tree with latch page, root page, page of leaves
884         // and page(s) of latches
885
886         memset (latchmgr, 0, 1 << bits);
887         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
888         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
889         latchmgr->alloc->bits = mgr->page_bits;
890
891         latchmgr->nlatchpage = nlatchpage;
892         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
893
894         //  initialize latch manager
895
896         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
897
898         //      size of hash table = total number of latchsets
899
900         if( latchhash > latchmgr->latchtotal )
901                 latchhash = latchmgr->latchtotal;
902
903         latchmgr->latchhash = latchhash;
904
905 #ifdef unix
906         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
907                 return bt_mgrclose (mgr), NULL;
908 #else
909         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
910                 return bt_mgrclose (mgr), NULL;
911
912         if( *amt < mgr->page_size )
913                 return bt_mgrclose (mgr), NULL;
914 #endif
915
916         memset (latchmgr, 0, 1 << bits);
917         latchmgr->alloc->bits = mgr->page_bits;
918
919         for( lvl=MIN_lvl; lvl--; ) {
920                 slotptr(latchmgr->alloc, 1)->off = offsetof(struct BtPage_, fence);
921                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
922                 latchmgr->alloc->fence[0] = 2;                  // create stopper key
923                 latchmgr->alloc->fence[1] = 0xff;
924                 latchmgr->alloc->fence[2] = 0xff;
925                 latchmgr->alloc->min = mgr->page_size;
926                 latchmgr->alloc->lvl = lvl;
927                 latchmgr->alloc->cnt = 1;
928                 latchmgr->alloc->act = 1;
929 #ifdef unix
930                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
931                         return bt_mgrclose (mgr), NULL;
932 #else
933                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
934                         return bt_mgrclose (mgr), NULL;
935
936                 if( *amt < mgr->page_size )
937                         return bt_mgrclose (mgr), NULL;
938 #endif
939         }
940
941         // clear out latch manager locks
942         //      and rest of pages to round out segment
943
944         memset(latchmgr, 0, mgr->page_size);
945         last = MIN_lvl + 1;
946
947         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
948 #ifdef unix
949                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
950 #else
951                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
952                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
953                         return bt_mgrclose (mgr), NULL;
954                 if( *amt < mgr->page_size )
955                         return bt_mgrclose (mgr), NULL;
956 #endif
957                 last++;
958         }
959
960 mgrlatch:
961 #ifdef unix
962         flag = PROT_READ | PROT_WRITE;
963         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
964         if( mgr->latchmgr == MAP_FAILED )
965                 return bt_mgrclose (mgr), NULL;
966         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
967         if( mgr->latchsets == MAP_FAILED )
968                 return bt_mgrclose (mgr), NULL;
969 #else
970         flag = PAGE_READWRITE;
971         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
972         if( !mgr->halloc )
973                 return bt_mgrclose (mgr), NULL;
974
975         flag = FILE_MAP_WRITE;
976         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
977         if( !mgr->latchmgr )
978                 return GetLastError(), bt_mgrclose (mgr), NULL;
979
980         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
981 #endif
982
983 #ifdef unix
984         free (latchmgr);
985 #else
986         VirtualFree (latchmgr, 0, MEM_RELEASE);
987 #endif
988         return mgr;
989 }
990
991 //      open BTree access method
992 //      based on buffer manager
993
994 BtDb *bt_open (BtMgr *mgr)
995 {
996 BtDb *bt = malloc (sizeof(*bt));
997
998         memset (bt, 0, sizeof(*bt));
999         bt->mgr = mgr;
1000 #ifdef unix
1001         bt->mem = malloc (3 *mgr->page_size);
1002 #else
1003         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1004 #endif
1005         bt->frame = (BtPage)bt->mem;
1006         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
1007         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
1008
1009         memset (bt->zero, 0, mgr->page_size);
1010         return bt;
1011 }
1012
1013 //  compare two keys, returning > 0, = 0, or < 0
1014 //  as the comparison value
1015
1016 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1017 {
1018 uint len1 = key1->len;
1019 int ans;
1020
1021         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1022                 return ans;
1023
1024         if( len1 > len2 )
1025                 return 1;
1026         if( len1 < len2 )
1027                 return -1;
1028
1029         return 0;
1030 }
1031
1032 //      Buffer Pool mgr
1033
1034 // find segment in pool
1035 // must be called with hashslot idx locked
1036 //      return NULL if not there
1037 //      otherwise return node
1038
1039 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1040 {
1041 BtPool *pool;
1042 uint slot;
1043
1044         // compute start of hash chain in pool
1045
1046         if( slot = bt->mgr->hash[idx] ) 
1047                 pool = bt->mgr->pool + slot;
1048         else
1049                 return NULL;
1050
1051         page_no &= ~bt->mgr->poolmask;
1052
1053         while( pool->basepage != page_no )
1054           if( pool = pool->hashnext )
1055                 continue;
1056           else
1057                 return NULL;
1058
1059         return pool;
1060 }
1061
1062 // add segment to hash table
1063
1064 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1065 {
1066 BtPool *node;
1067 uint slot;
1068
1069         pool->hashprev = pool->hashnext = NULL;
1070         pool->basepage = page_no & ~bt->mgr->poolmask;
1071         pool->lru = 1;
1072
1073         if( slot = bt->mgr->hash[idx] ) {
1074                 node = bt->mgr->pool + slot;
1075                 pool->hashnext = node;
1076                 node->hashprev = pool;
1077         }
1078
1079         bt->mgr->hash[idx] = pool->slot;
1080 }
1081
1082 //      find best segment to evict from buffer pool
1083
1084 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1085 {
1086 unsigned long long int target = ~0LL;
1087 BtPool *pool = NULL, *node;
1088
1089         if( !hashslot )
1090                 return NULL;
1091
1092         node = bt->mgr->pool + hashslot;
1093
1094         //  scan pool entries under hash table slot
1095
1096         do {
1097           if( node->pin )
1098                 continue;
1099           if( node->lru > target )
1100                 continue;
1101           target = node->lru;
1102           pool = node;
1103         } while( node = node->hashnext );
1104
1105         return pool;
1106 }
1107
1108 //  map new buffer pool segment to virtual memory
1109
1110 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1111 {
1112 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1113 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1114 int flag;
1115
1116 #ifdef unix
1117         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1118         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED | MAP_POPULATE, bt->mgr->idx, off);
1119         if( pool->map == MAP_FAILED )
1120                 return bt->err = BTERR_map;
1121
1122 #else
1123         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1124         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1125         if( !pool->hmap )
1126                 return bt->err = BTERR_map;
1127
1128         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1129         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1130         if( !pool->map )
1131                 return bt->err = BTERR_map;
1132 #endif
1133         return bt->err = 0;
1134 }
1135
1136 //      calculate page within pool
1137
1138 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1139 {
1140 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1141 BtPage page;
1142
1143         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1144         return page;
1145 }
1146
1147 //  release pool pin
1148
1149 void bt_unpinpool (BtPool *pool)
1150 {
1151 #ifdef unix
1152         __sync_fetch_and_add(&pool->pin, -1);
1153 #else
1154         _InterlockedDecrement16 (&pool->pin);
1155 #endif
1156 }
1157
1158 //      find or place requested page in segment-pool
1159 //      return pool table entry, incrementing pin
1160
1161 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1162 {
1163 BtPool *pool, *node, *next;
1164 uint slot, idx, victim;
1165
1166         //      lock hash table chain
1167
1168         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1169         bt_spinreadlock (&bt->mgr->latch[idx], 1);
1170
1171         //      look up in hash table
1172
1173         if( pool = bt_findpool(bt, page_no, idx) ) {
1174 #ifdef unix
1175                 __sync_fetch_and_add(&pool->pin, 1);
1176 #else
1177                 _InterlockedIncrement16 (&pool->pin);
1178 #endif
1179                 bt_spinreleaseread (&bt->mgr->latch[idx], 1);
1180                 pool->lru++;
1181                 return pool;
1182         }
1183
1184         //      upgrade to write lock
1185
1186         bt_spinreleaseread (&bt->mgr->latch[idx], 1);
1187         bt_spinwritelock (&bt->mgr->latch[idx], 1);
1188
1189         // try to find page in pool with write lock
1190
1191         if( pool = bt_findpool(bt, page_no, idx) ) {
1192 #ifdef unix
1193                 __sync_fetch_and_add(&pool->pin, 1);
1194 #else
1195                 _InterlockedIncrement16 (&pool->pin);
1196 #endif
1197                 bt_spinreleasewrite (&bt->mgr->latch[idx], 1);
1198                 pool->lru++;
1199                 return pool;
1200         }
1201
1202         // allocate a new pool node
1203         // and add to hash table
1204
1205 #ifdef unix
1206         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1207 #else
1208         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1209 #endif
1210
1211         if( ++slot < bt->mgr->poolmax ) {
1212                 pool = bt->mgr->pool + slot;
1213                 pool->slot = slot;
1214
1215                 if( bt_mapsegment(bt, pool, page_no) )
1216                         return NULL;
1217
1218                 bt_linkhash(bt, pool, page_no, idx);
1219 #ifdef unix
1220                 __sync_fetch_and_add(&pool->pin, 1);
1221 #else
1222                 _InterlockedIncrement16 (&pool->pin);
1223 #endif
1224                 bt_spinreleasewrite (&bt->mgr->latch[idx], 1);
1225                 return pool;
1226         }
1227
1228         // pool table is full
1229         //      find best pool entry to evict
1230
1231 #ifdef unix
1232         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1233 #else
1234         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1235 #endif
1236
1237         while( 1 ) {
1238 #ifdef unix
1239                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1240 #else
1241                 victim = _InterlockedIncrement16 (&bt->mgr->evicted) - 1;
1242 #endif
1243                 victim %= bt->mgr->hashsize;
1244
1245                 // try to get write lock
1246                 //      skip entry if not obtained
1247
1248                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1249                         continue;
1250
1251                 //  if pool entry is empty
1252                 //      or any pages are pinned
1253                 //      skip this entry
1254
1255                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1256                         bt_spinreleasewrite (&bt->mgr->latch[victim], 1);
1257                         continue;
1258                 }
1259
1260                 // unlink victim pool node from hash table
1261
1262                 if( node = pool->hashprev )
1263                         node->hashnext = pool->hashnext;
1264                 else if( node = pool->hashnext )
1265                         bt->mgr->hash[victim] = node->slot;
1266                 else
1267                         bt->mgr->hash[victim] = 0;
1268
1269                 if( node = pool->hashnext )
1270                         node->hashprev = pool->hashprev;
1271
1272                 bt_spinreleasewrite (&bt->mgr->latch[victim], 1);
1273
1274                 //      remove old file mapping
1275 #ifdef unix
1276                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1277 #else
1278                 FlushViewOfFile(pool->map, 0);
1279                 UnmapViewOfFile(pool->map);
1280                 CloseHandle(pool->hmap);
1281 #endif
1282                 pool->map = NULL;
1283
1284                 //  create new pool mapping
1285                 //  and link into hash table
1286
1287                 if( bt_mapsegment(bt, pool, page_no) )
1288                         return NULL;
1289
1290                 bt_linkhash(bt, pool, page_no, idx);
1291 #ifdef unix
1292                 __sync_fetch_and_add(&pool->pin, 1);
1293 #else
1294                 _InterlockedIncrement16 (&pool->pin);
1295 #endif
1296                 bt_spinreleasewrite (&bt->mgr->latch[idx], 1);
1297                 return pool;
1298         }
1299 }
1300
1301 // place write, read, or parent lock on requested page_no.
1302
1303 void bt_lockpage(BtLock mode, BtLatchSet *set)
1304 {
1305         switch( mode ) {
1306         case BtLockRead:
1307                 bt_spinreadlock (set->readwr, 0);
1308                 break;
1309         case BtLockWrite:
1310                 bt_spinwritelock (set->readwr, 0);
1311                 break;
1312         case BtLockAccess:
1313                 bt_spinreadlock (set->access, 0);
1314                 break;
1315         case BtLockDelete:
1316                 bt_spinwritelock (set->access, 0);
1317                 break;
1318         case BtLockParent:
1319                 bt_spinwritelock (set->parent, 0);
1320                 break;
1321         }
1322 }
1323
1324 // remove write, read, or parent lock on requested page
1325
1326 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1327 {
1328         switch( mode ) {
1329         case BtLockRead:
1330                 bt_spinreleaseread (set->readwr, 0);
1331                 break;
1332         case BtLockWrite:
1333                 bt_spinreleasewrite (set->readwr, 0);
1334                 break;
1335         case BtLockAccess:
1336                 bt_spinreleaseread (set->access, 0);
1337                 break;
1338         case BtLockDelete:
1339                 bt_spinreleasewrite (set->access, 0);
1340                 break;
1341         case BtLockParent:
1342                 bt_spinreleasewrite (set->parent, 0);
1343                 break;
1344         }
1345 }
1346
1347 //      allocate a new page and write page into it
1348
1349 uid bt_newpage(BtDb *bt, BtPage page)
1350 {
1351 BtPageSet set[1];
1352 uid new_page;
1353 int reuse;
1354
1355         //      lock allocation page
1356
1357         bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
1358
1359         // use empty chain first
1360         // else allocate empty page
1361
1362         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1363                 if( set->pool = bt_pinpool (bt, new_page) )
1364                         set->page = bt_page (bt, set->pool, new_page);
1365                 else
1366                         return 0;
1367
1368                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
1369                 bt_unpinpool (set->pool);
1370                 reuse = 1;
1371         } else {
1372                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1373                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1374                 reuse = 0;
1375         }
1376 #ifdef unix
1377         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1378                 return bt->err = BTERR_wrt, 0;
1379
1380         // if writing first page of pool block, zero last page in the block
1381
1382         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1383         {
1384                 // use zero buffer to write zeros
1385                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1386                         return bt->err = BTERR_wrt, 0;
1387         }
1388 #else
1389         //      bring new page into pool and copy page.
1390         //      this will extend the file into the new pages.
1391
1392         if( set->pool = bt_pinpool (bt, new_page) )
1393                 set->page = bt_page (bt, set->pool, new_page);
1394         else
1395                 return 0;
1396
1397         memcpy(set->page, page, bt->mgr->page_size);
1398         bt_unpinpool (set->pool);
1399 #endif
1400         // unlock allocation latch and return new page no
1401
1402         bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
1403         return new_page;
1404 }
1405
1406 //  find slot in page for given key at a given level
1407
1408 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1409 {
1410 uint diff, higher = set->page->cnt, low = 1, slot;
1411
1412         //        make stopper key an infinite fence value
1413
1414         if( bt_getid (set->page->right) )
1415                 higher++;
1416
1417         //      low is the lowest candidate.
1418         //  loop ends when they meet
1419
1420         //  higher is already
1421         //      tested as .ge. the given key.
1422
1423         while( diff = higher - low ) {
1424                 slot = low + ( diff >> 1 );
1425                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1426                         low = slot + 1;
1427                 else
1428                         higher = slot;
1429         }
1430
1431         if( higher <= set->page->cnt )
1432                 return higher;
1433
1434         //      if leaf page, compare against fence value
1435
1436         //      return zero if key is on right link page
1437         //      or return slot beyond last key
1438
1439         if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
1440                 return 0;
1441
1442         return higher;
1443 }
1444
1445 //  find and load page at given level for given key
1446 //      leave page rd or wr locked as requested
1447
1448 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
1449 {
1450 uid page_no = ROOT_page, prevpage = 0;
1451 uint drill = 0xff, slot;
1452 BtLatchSet *prevlatch;
1453 uint mode, prevmode;
1454 BtPool *prevpool;
1455
1456   //  start at root of btree and drill down
1457
1458   do {
1459         // determine lock mode of drill level
1460         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1461
1462         set->latch = bt_pinlatch (bt, page_no);
1463         set->page_no = page_no;
1464
1465         // pin page contents
1466
1467         if( set->pool = bt_pinpool (bt, page_no) )
1468                 set->page = bt_page (bt, set->pool, page_no);
1469         else
1470                 return 0;
1471
1472         // obtain access lock using lock chaining with Access mode
1473
1474         if( page_no > ROOT_page )
1475           bt_lockpage(BtLockAccess, set->latch);
1476
1477         //      release & unpin parent page
1478
1479         if( prevpage ) {
1480           bt_unlockpage(prevmode, prevlatch);
1481           bt_unpinlatch (prevlatch);
1482           bt_unpinpool (prevpool);
1483           prevpage = 0;
1484         }
1485
1486         // obtain read lock using lock chaining
1487
1488         bt_lockpage(mode, set->latch);
1489
1490         if( page_no > ROOT_page )
1491           bt_unlockpage(BtLockAccess, set->latch);
1492
1493         // re-read and re-lock root after determining actual level of root
1494
1495         if( set->page->lvl != drill) {
1496                 if ( set->page_no != ROOT_page )
1497                         return bt->err = BTERR_struct, 0;
1498                         
1499                 drill = set->page->lvl;
1500
1501                 if( lock == BtLockWrite && drill == lvl ) {
1502                   bt_unlockpage(mode, set->latch);
1503                   bt_unpinlatch (set->latch);
1504                   bt_unpinpool (set->pool);
1505                   continue;
1506                 }
1507         }
1508
1509         prevpage = set->page_no;
1510         prevlatch = set->latch;
1511         prevpool = set->pool;
1512         prevmode = mode;
1513
1514         //      if page is being deleted and we should continue right
1515
1516         if( set->page->kill && set->page->goright ) {
1517                 page_no = bt_getid (set->page->right);
1518                 continue;
1519         }
1520
1521         //      otherwise, wait for deleted node to clear
1522
1523         if( set->page->kill ) {
1524                 bt_unlockpage(mode, set->latch);
1525                 bt_unpinlatch (set->latch);
1526                 bt_unpinpool (set->pool);
1527                 page_no = ROOT_page;
1528                 prevpage = 0;
1529                 drill = 0xff;
1530 #ifdef unix
1531                 sched_yield();
1532 #else
1533                 SwitchToThread();
1534 #endif
1535                 continue;
1536         }
1537         
1538         //  find key on page at this level
1539         //  and descend to requested level
1540
1541         if( slot = bt_findslot (set, key, len) ) {
1542           if( drill == lvl )
1543                 return slot;
1544
1545           if( slot > set->page->cnt )
1546                 return bt->err = BTERR_struct;
1547
1548           while( slotptr(set->page, slot)->dead )
1549                 if( slot++ < set->page->cnt )
1550                         continue;
1551                 else
1552                         return bt->err = BTERR_struct, 0;
1553
1554           page_no = bt_getid(slotptr(set->page, slot)->id);
1555           drill--;
1556           continue;
1557         }
1558
1559         //  or slide right into next page
1560
1561         page_no = bt_getid(set->page->right);
1562
1563   } while( page_no );
1564
1565   // return error on end of right chain
1566
1567   bt->err = BTERR_struct;
1568   return 0;     // return error
1569 }
1570
1571 // drill down fixing fence values for left sibling tree
1572
1573 //  call with set write locked
1574 //      return with set unlocked & unpinned.
1575
1576 BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
1577 {
1578 unsigned char oldfence[256];
1579 BtPageSet next[1];
1580 int chk;
1581
1582   memcpy (oldfence, set->page->fence, 256);
1583
1584   while( !set->page->kill && set->page->lvl ) {
1585         next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
1586         next->latch = bt_pinlatch (bt, next->page_no);
1587         bt_lockpage (BtLockParent, next->latch);
1588         bt_lockpage (BtLockAccess, next->latch);
1589         bt_lockpage (BtLockWrite, next->latch);
1590         bt_unlockpage (BtLockAccess, next->latch);
1591
1592         if( next->pool = bt_pinpool (bt, next->page_no) )
1593                 next->page = bt_page (bt, next->pool, next->page_no);
1594         else
1595                 return bt->err;
1596
1597         chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
1598
1599         if( chk < 0 ) {
1600           next->page_no = bt_getid (next->page->right);
1601           bt_unlockpage (BtLockWrite, next->latch);
1602           bt_unlockpage (BtLockParent, next->latch);
1603           bt_unpinlatch (next->latch);
1604           bt_unpinpool (next->pool);
1605           continue;
1606         }
1607
1608         if( chk > 0 )
1609                 return bt->err = BTERR_struct;
1610
1611         if( bt_fixfences (bt, next, newfence) )
1612                 return bt->err;
1613
1614         break;
1615   }
1616
1617   memcpy (set->page->fence, newfence, 256);
1618
1619   bt_unlockpage (BtLockWrite, set->latch);
1620   bt_unlockpage (BtLockParent, set->latch);
1621   bt_unpinlatch (set->latch);
1622   bt_unpinpool (set->pool);
1623   return 0;
1624 }
1625
1626 //      return page to free list
1627 //      page must be delete & write locked
1628
1629 void bt_freepage (BtDb *bt, BtPageSet *set)
1630 {
1631         //      lock allocation page
1632
1633         bt_spinwritelock (bt->mgr->latchmgr->lock, 0);
1634
1635         //      store chain in second right
1636         bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1637         bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
1638         set->page->free = 1;
1639
1640         // unlock released page
1641
1642         bt_unlockpage (BtLockDelete, set->latch);
1643         bt_unlockpage (BtLockWrite, set->latch);
1644         bt_unpinlatch (set->latch);
1645         bt_unpinpool (set->pool);
1646
1647         // unlock allocation page
1648
1649         bt_spinreleasewrite (bt->mgr->latchmgr->lock, 0);
1650 }
1651
1652 //      remove the root level by promoting its only child
1653 //      call with parent and child pages
1654
1655 BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
1656 {
1657 uid next = 0;
1658
1659   do {
1660         if( next ) {
1661           child->latch = bt_pinlatch (bt, next);
1662           bt_lockpage (BtLockDelete, child->latch);
1663           bt_lockpage (BtLockWrite, child->latch);
1664
1665           if( child->pool = bt_pinpool (bt, next) )
1666                 child->page = bt_page (bt, child->pool, next);
1667           else
1668                 return bt->err;
1669
1670           child->page_no = next;
1671         }
1672
1673         memcpy (root->page, child->page, bt->mgr->page_size);
1674         next = bt_getid (slotptr(child->page, child->page->cnt)->id);
1675         bt_freepage (bt, child);
1676   } while( root->page->lvl > 1 && root->page->cnt == 1 );
1677
1678   bt_unlockpage (BtLockWrite, root->latch);
1679   bt_unpinlatch (root->latch);
1680   bt_unpinpool (root->pool);
1681   return 0;
1682 }
1683
1684 //  pull right page over ourselves in simple merge
1685
1686 BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
1687 {
1688         //  install ourselves as child page
1689         //      and delete ourselves from parent
1690
1691         bt_putid (slotptr(parent->page, idx)->id, set->page_no);
1692         slotptr(parent->page, slot)->dead = 1;
1693         parent->page->act--;
1694
1695         //      collapse any empty slots
1696
1697         while( idx = parent->page->cnt - 1 )
1698           if( slotptr(parent->page, idx)->dead ) {
1699                 *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
1700                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1701           } else
1702                   break;
1703
1704         memcpy (set->page, right->page, bt->mgr->page_size);
1705         bt_unlockpage (BtLockParent, right->latch);
1706
1707         bt_freepage (bt, right);
1708
1709         //      do we need to remove a btree level?
1710         //      (leave the first page of leaves alone)
1711
1712         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1713           if( set->page->lvl )
1714                 return bt_removeroot (bt, parent, set);
1715
1716         bt_unlockpage (BtLockWrite, parent->latch);
1717         bt_unlockpage (BtLockDelete, set->latch);
1718         bt_unlockpage (BtLockWrite, set->latch);
1719         bt_unpinlatch (parent->latch);
1720         bt_unpinpool (parent->pool);
1721         bt_unpinlatch (set->latch);
1722         bt_unpinpool (set->pool);
1723         return 0;
1724 }
1725
1726 //      remove both child and parent from the btree
1727 //      from the fence position in the parent
1728 //      call with both pages locked for writing
1729
1730 BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl)
1731 {
1732 unsigned char pagefence[256];
1733 uint idx;
1734
1735         //  pull right sibling over ourselves and unlock
1736
1737         memcpy (child->page, right->page, bt->mgr->page_size);
1738
1739         bt_unlockpage (BtLockWrite, child->latch);
1740         bt_unpinlatch (child->latch);
1741         bt_unpinpool (child->pool);
1742
1743         //  install ourselves into right link of old right page
1744
1745         bt_putid (right->page->right, child->page_no);
1746         right->page->goright = 1;       // tell bt_loadpage to go right to us
1747         right->page->kill = 1;
1748
1749         bt_unlockpage (BtLockWrite, right->latch);
1750
1751         //      remove our slot from our parent
1752         //      signal to move right
1753
1754         parent->page->goright = 1;      // tell bt_loadpage to go right to rparent
1755         parent->page->kill = 1;
1756         parent->page->act--;
1757
1758         //      redirect right page pointer in right parent to us
1759
1760         for( idx = 0; idx++ < rparent->page->cnt; )
1761           if( !slotptr(rparent->page, idx)->dead )
1762                 break;
1763
1764         if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
1765                 return bt->err = BTERR_struct;
1766
1767         bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
1768         bt_unlockpage (BtLockWrite, rparent->latch);
1769         bt_unpinlatch (rparent->latch);
1770         bt_unpinpool (rparent->pool);
1771
1772         //      free the right page
1773
1774         bt_lockpage (BtLockDelete, right->latch);
1775         bt_lockpage (BtLockWrite, right->latch);
1776         bt_freepage (bt, right);
1777
1778         //      save parent page fence value
1779
1780         memcpy (pagefence, parent->page->fence, 256);
1781         bt_unlockpage (BtLockWrite, parent->latch);
1782
1783         return bt_removepage (bt, parent, lvl, pagefence);
1784 }
1785
1786 //      remove page from btree
1787 //      call with page unlocked
1788 //      returns with page on free list
1789
1790 BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence)
1791 {
1792 BtPageSet parent[1], sibling[1], rparent[1];
1793 unsigned char newfence[256];
1794 uint slot, idx;
1795 BtKey ptr;
1796
1797         //      load and lock our parent
1798
1799 retry:
1800         if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
1801                 return bt->err;
1802
1803         //      can we do a simple merge entirely
1804         //      between siblings on the parent page?
1805
1806         if( slot < parent->page->cnt ) {
1807                 // find our right neighbor
1808                 //      right must exist because the stopper prevents
1809                 //      the rightmost page from deleting
1810
1811                 for( idx = slot; idx++ < parent->page->cnt; )
1812                   if( !slotptr(parent->page, idx)->dead )
1813                         break;
1814
1815                 sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
1816
1817                 bt_lockpage (BtLockDelete, set->latch);
1818                 bt_lockpage (BtLockWrite, set->latch);
1819
1820                 //      merge right if sibling shows up in
1821                 //  our parent and is not being killed
1822
1823                 if( sibling->page_no == bt_getid (set->page->right) ) {
1824                   sibling->latch = bt_pinlatch (bt, sibling->page_no);
1825                   bt_lockpage (BtLockParent, sibling->latch);
1826                   bt_lockpage (BtLockDelete, sibling->latch);
1827                   bt_lockpage (BtLockWrite, sibling->latch);
1828
1829                   if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
1830                         sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
1831                   else
1832                         return bt->err;
1833
1834                   if( !sibling->page->kill )
1835                         return bt_mergeright(bt, set, parent, sibling, slot, idx);
1836
1837                   //  try again later
1838
1839                   bt_unlockpage (BtLockWrite, sibling->latch);
1840                   bt_unlockpage (BtLockParent, sibling->latch);
1841                   bt_unlockpage (BtLockDelete, sibling->latch);
1842                   bt_unpinlatch (sibling->latch);
1843                   bt_unpinpool (sibling->pool);
1844                 }
1845
1846                 bt_unlockpage (BtLockDelete, set->latch);
1847                 bt_unlockpage (BtLockWrite, set->latch);
1848                 bt_unlockpage (BtLockWrite, parent->latch);
1849                 bt_unpinlatch (parent->latch);
1850                 bt_unpinpool (parent->pool);
1851 #ifdef linux
1852                 sched_yield();
1853 #else
1854                 SwitchToThread();
1855 #endif
1856                 goto retry;
1857         }
1858
1859         //  find our left neighbor in our parent page
1860
1861         for( idx = slot; --idx; )
1862           if( !slotptr(parent->page, idx)->dead )
1863                 break;
1864
1865         //      if no left neighbor, delete ourselves and our parent
1866
1867         if( !idx ) {
1868                 bt_lockpage (BtLockAccess, set->latch);
1869                 bt_lockpage (BtLockWrite, set->latch);
1870                 bt_unlockpage (BtLockAccess, set->latch);
1871
1872                 rparent->page_no = bt_getid (parent->page->right);
1873                 rparent->latch = bt_pinlatch (bt, rparent->page_no);
1874
1875                 bt_lockpage (BtLockAccess, rparent->latch);
1876                 bt_lockpage (BtLockWrite, rparent->latch);
1877                 bt_unlockpage (BtLockAccess, rparent->latch);
1878
1879                 if( rparent->pool = bt_pinpool (bt, rparent->page_no) )
1880                         rparent->page = bt_page (bt, rparent->pool, rparent->page_no);
1881                 else
1882                         return bt->err;
1883
1884                 if( !rparent->page->kill ) {
1885                   sibling->page_no = bt_getid (set->page->right);
1886                   sibling->latch = bt_pinlatch (bt, sibling->page_no);
1887
1888                   bt_lockpage (BtLockAccess, sibling->latch);
1889                   bt_lockpage (BtLockWrite, sibling->latch);
1890                   bt_unlockpage (BtLockAccess, sibling->latch);
1891
1892                   if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
1893                         sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
1894                   else
1895                         return bt->err;
1896
1897                   if( !sibling->page->kill )
1898                         return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
1899
1900                   //  try again later
1901
1902                   bt_unlockpage (BtLockWrite, sibling->latch);
1903                   bt_unpinlatch (sibling->latch);
1904                   bt_unpinpool (sibling->pool);
1905                 }
1906
1907                 bt_unlockpage (BtLockWrite, set->latch);
1908                 bt_unlockpage (BtLockWrite, rparent->latch);
1909                 bt_unpinlatch (rparent->latch);
1910                 bt_unpinpool (rparent->pool);
1911
1912                 bt_unlockpage (BtLockWrite, parent->latch);
1913                 bt_unpinlatch (parent->latch);
1914                 bt_unpinpool (parent->pool);
1915 #ifdef linux
1916                 sched_yield();
1917 #else
1918                 SwitchToThread();
1919 #endif
1920                 goto retry;
1921         }
1922
1923         // redirect parent to our left sibling
1924         // lock and map our left sibling's page
1925
1926         sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
1927         sibling->latch = bt_pinlatch (bt, sibling->page_no);
1928
1929         //      wait our turn on fence key maintenance
1930
1931         bt_lockpage(BtLockParent, sibling->latch);
1932         bt_lockpage(BtLockAccess, sibling->latch);
1933         bt_lockpage(BtLockWrite, sibling->latch);
1934         bt_unlockpage(BtLockAccess, sibling->latch);
1935
1936         if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
1937                 sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
1938         else
1939                 return bt->err;
1940
1941         //  wait until left sibling is in our parent
1942
1943         if( bt_getid (sibling->page->right) != set->page_no ) {
1944                 bt_unlockpage (BtLockWrite, parent->latch);
1945                 bt_unlockpage (BtLockWrite, sibling->latch);
1946                 bt_unlockpage (BtLockParent, sibling->latch);
1947                 bt_unpinlatch (parent->latch);
1948                 bt_unpinpool (parent->pool);
1949                 bt_unpinlatch (sibling->latch);
1950                 bt_unpinpool (sibling->pool);
1951 #ifdef linux
1952                 sched_yield();
1953 #else
1954                 SwitchToThread();
1955 #endif
1956                 goto retry;
1957         }
1958
1959         //      delete our left sibling from parent
1960
1961         slotptr(parent->page,idx)->dead = 1;
1962         parent->page->dirty = 1;
1963         parent->page->act--;
1964
1965         //      redirect our parent slot to our left sibling
1966
1967         bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
1968         memcpy (sibling->page->right, set->page->right, BtId);
1969
1970         //      collapse dead slots from parent
1971
1972         while( idx = parent->page->cnt - 1 )
1973           if( slotptr(parent->page, idx)->dead ) {
1974                 *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
1975                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1976           } else
1977                   break;
1978
1979         //  free our original page
1980
1981         bt_lockpage (BtLockDelete, set->latch);
1982         bt_lockpage (BtLockWrite, set->latch);
1983         bt_freepage (bt, set);
1984
1985         //      go down the left node's fence keys to the leaf level
1986         //      and update the fence keys in each page
1987
1988         memcpy (newfence, parent->page->fence, 256);
1989
1990         if( bt_fixfences (bt, sibling, newfence) )
1991                 return bt->err;
1992
1993         //  promote sibling as new root?
1994
1995         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1996          if( sibling->page->lvl ) {
1997           sibling->latch = bt_pinlatch (bt, sibling->page_no);
1998           bt_lockpage (BtLockDelete, sibling->latch);
1999           bt_lockpage (BtLockWrite, sibling->latch);
2000
2001           if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
2002                 sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
2003           else
2004                 return bt->err;
2005
2006           return bt_removeroot (bt, parent, sibling);
2007          }
2008
2009         bt_unlockpage (BtLockWrite, parent->latch);
2010         bt_unpinlatch (parent->latch);
2011         bt_unpinpool (parent->pool);
2012
2013         return 0;
2014 }
2015
2016 //  find and delete key on page by marking delete flag bit
2017 //  if page becomes empty, delete it from the btree
2018
2019 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
2020 {
2021 unsigned char pagefence[256];
2022 uint slot, idx, found;
2023 BtPageSet set[1];
2024 BtKey ptr;
2025
2026         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
2027                 ptr = keyptr(set->page, slot);
2028         else
2029                 return bt->err;
2030
2031         // if key is found delete it, otherwise ignore request
2032
2033         if( found = slot <= set->page->cnt )
2034           if( found = !keycmp (ptr, key, len) )
2035                 if( found = slotptr(set->page, slot)->dead == 0 ) {
2036                   slotptr(set->page,slot)->dead = 1;
2037                   set->page->dirty = 1;
2038                   set->page->act--;
2039
2040                   // collapse empty slots
2041
2042                   while( idx = set->page->cnt - 1 )
2043                         if( slotptr(set->page, idx)->dead ) {
2044                           *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2045                           memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2046                         } else
2047                                 break;
2048                 }
2049
2050         if( set->page->act ) {
2051                 bt_unlockpage(BtLockWrite, set->latch);
2052                 bt_unpinlatch (set->latch);
2053                 bt_unpinpool (set->pool);
2054                 return bt->found = found, 0;
2055         }
2056
2057         memcpy (pagefence, set->page->fence, 256);
2058         set->page->kill = 1;
2059
2060         bt_unlockpage (BtLockWrite, set->latch);
2061
2062         if( bt_removepage (bt, set, 0, pagefence) )
2063                 return bt->err;
2064
2065         bt->found = found;
2066         return 0;
2067 }
2068
2069 //      find key in leaf level and return row-id
2070
2071 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2072 {
2073 BtPageSet set[1];
2074 uint  slot;
2075 BtKey ptr;
2076 uid id;
2077
2078         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2079                 ptr = keyptr(set->page, slot);
2080         else
2081                 return 0;
2082
2083         // if key exists, return row-id
2084         //      otherwise return 0
2085
2086         if( !keycmp (ptr, key, len) )
2087                 id = bt_getid(slotptr(set->page,slot)->id);
2088         else
2089                 id = 0;
2090
2091         bt_unlockpage (BtLockRead, set->latch);
2092         bt_unpinlatch (set->latch);
2093         bt_unpinpool (set->pool);
2094         return id;
2095 }
2096
2097 //      check page for space available,
2098 //      clean if necessary and return
2099 //      0 - page needs splitting
2100 //      >0  new slot value
2101
2102 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
2103 {
2104 uint nxt = bt->mgr->page_size, off;
2105 uint cnt = 0, idx = 0;
2106 uint max = page->cnt;
2107 uint newslot = max;
2108 BtKey key;
2109
2110         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2111                 return slot;
2112
2113         //      skip cleanup if nothing to reclaim
2114
2115         if( !page->dirty )
2116                 return 0;
2117
2118         memcpy (bt->frame, page, bt->mgr->page_size);
2119
2120         // skip page info and set rest of page to zero
2121
2122         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2123         page->dirty = 0;
2124         page->act = 0;
2125
2126         // try cleaning up page first
2127         // by removing deleted keys
2128
2129         while( cnt++ < max ) {
2130                 if( cnt == slot )
2131                         newslot = idx + 1;
2132                 if( slotptr(bt->frame,cnt)->dead )
2133                         continue;
2134
2135                 // if its not the fence key,
2136                 // copy the key across
2137
2138                 off = slotptr(bt->frame,cnt)->off;
2139
2140                 if( off >= sizeof(*page) ) {
2141                         key = keyptr(bt->frame, cnt);
2142                         off = nxt -= key->len + 1;
2143                         memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2144                 }
2145
2146                 // copy slot
2147
2148                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
2149                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2150                 slotptr(page, idx)->off = off;
2151                 page->act++;
2152         }
2153
2154         page->min = nxt;
2155         page->cnt = idx;
2156
2157         //      see if page has enough space now, or does it need splitting?
2158
2159         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2160                 return newslot;
2161
2162         return 0;
2163 }
2164
2165 // split the root and raise the height of the btree
2166
2167 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
2168 {
2169 uint nxt = bt->mgr->page_size;
2170 unsigned char leftkey[256];
2171 uid new_page;
2172
2173         //  Obtain an empty page to use, and copy the current
2174         //  root contents into it, e.g. lower keys
2175
2176         memcpy (leftkey, root->page->fence, 256);
2177         root->page->posted = 1;
2178
2179         if( !(new_page = bt_newpage(bt, root->page)) )
2180                 return bt->err;
2181
2182         // preserve the page info at the bottom
2183         // of higher keys and set rest to zero
2184
2185         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2186         memset(root->page->fence, 0, 256);
2187         root->page->fence[0] = 2;
2188         root->page->fence[1] = 0xff;
2189         root->page->fence[2] = 0xff;
2190
2191         // insert lower keys page fence key on newroot page
2192
2193         nxt -= *leftkey + 1;
2194         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2195         bt_putid(slotptr(root->page, 1)->id, new_page);
2196         slotptr(root->page, 1)->off = nxt;
2197         
2198         // insert stopper key on newroot page
2199         // and increase the root height
2200
2201         bt_putid(slotptr(root->page, 2)->id, page_no2);
2202         slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
2203
2204         bt_putid(root->page->right, 0);
2205         root->page->min = nxt;          // reset lowest used offset and key count
2206         root->page->cnt = 2;
2207         root->page->act = 2;
2208         root->page->lvl++;
2209
2210         // release and unpin root
2211
2212         bt_unlockpage(BtLockWrite, root->latch);
2213         bt_unpinlatch (root->latch);
2214         bt_unpinpool (root->pool);
2215         return 0;
2216 }
2217
2218 //  split already locked full node
2219 //      return unlocked.
2220
2221 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2222 {
2223 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size, off;
2224 unsigned char fencekey[256];
2225 uint lvl = set->page->lvl;
2226 uid right;
2227 BtKey key;
2228
2229         //  split higher half of keys to bt->frame
2230
2231         memset (bt->frame, 0, bt->mgr->page_size);
2232         max = set->page->cnt;
2233         cnt = max / 2;
2234         idx = 0;
2235
2236         while( cnt++ < max ) {
2237                 if( !lvl || cnt < max ) {
2238                         key = keyptr(set->page, cnt);
2239                         off = nxt -= key->len + 1;
2240                         memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2241                 } else
2242                         off = offsetof(struct BtPage_, fence);
2243
2244                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
2245                 slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
2246                 slotptr(bt->frame, idx)->off = off;
2247                 bt->frame->act++;
2248         }
2249
2250         if( set->page_no == ROOT_page )
2251                 bt->frame->posted = 1;
2252
2253         memcpy (bt->frame->fence, set->page->fence, 256);
2254         bt->frame->bits = bt->mgr->page_bits;
2255         bt->frame->min = nxt;
2256         bt->frame->cnt = idx;
2257         bt->frame->lvl = lvl;
2258
2259         // link right node
2260
2261         if( set->page_no > ROOT_page )
2262                 memcpy (bt->frame->right, set->page->right, BtId);
2263
2264         //      get new free page and write higher keys to it.
2265
2266         if( !(right = bt_newpage(bt, bt->frame)) )
2267                 return bt->err;
2268
2269         //      update lower keys to continue in old page
2270
2271         memcpy (bt->frame, set->page, bt->mgr->page_size);
2272         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2273         nxt = bt->mgr->page_size;
2274         set->page->posted = 0;
2275         set->page->dirty = 0;
2276         set->page->act = 0;
2277         cnt = 0;
2278         idx = 0;
2279
2280         //  assemble page of smaller keys
2281
2282         while( cnt++ < max / 2 ) {
2283                 key = keyptr(bt->frame, cnt);
2284
2285                 if( !lvl || cnt < max / 2 ) {
2286                         off = nxt -= key->len + 1;
2287                         memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2288                 } else 
2289                         off = offsetof(struct BtPage_, fence);
2290
2291                 memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2292                 slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2293                 slotptr(set->page, idx)->off = off;
2294                 set->page->act++;
2295         }
2296
2297         // install fence key for smaller key page
2298
2299         memset(set->page->fence, 0, 256);
2300         memcpy(set->page->fence, key, key->len + 1);
2301
2302         bt_putid(set->page->right, right);
2303         set->page->min = nxt;
2304         set->page->cnt = idx;
2305
2306         // if current page is the root page, split it
2307
2308         if( set->page_no == ROOT_page )
2309                 return bt_splitroot (bt, set, right);
2310
2311         bt_unlockpage (BtLockWrite, set->latch);
2312
2313         // insert new fences in their parent pages
2314
2315         while( 1 ) {
2316                 bt_lockpage (BtLockParent, set->latch);
2317                 bt_lockpage (BtLockWrite, set->latch);
2318
2319                 memcpy (fencekey, set->page->fence, 256);
2320                 right = bt_getid (set->page->right);
2321
2322                 if( set->page->posted ) {
2323                         bt_unlockpage (BtLockParent, set->latch);
2324                         bt_unlockpage (BtLockWrite, set->latch);
2325                         bt_unpinlatch (set->latch);
2326                         bt_unpinpool (set->pool);
2327                         return 0;
2328                 }
2329
2330                 set->page->posted = 1;
2331                 bt_unlockpage (BtLockWrite, set->latch);
2332
2333                 if( bt_insertkey (bt, fencekey+1, *fencekey, set->page_no, time(NULL), lvl+1) )
2334                         return bt->err;
2335
2336                 bt_unlockpage (BtLockParent, set->latch);
2337                 bt_unpinlatch (set->latch);
2338                 bt_unpinpool (set->pool);
2339
2340                 if( !(set->page_no = right) )
2341                         break;
2342
2343                 set->latch = bt_pinlatch (bt, right);
2344
2345                 if( set->pool = bt_pinpool (bt, right) )
2346                         set->page = bt_page (bt, set->pool, right);
2347                 else
2348                         return bt->err;
2349         }
2350
2351         return 0;
2352 }
2353
2354 //  Insert new key into the btree at given level.
2355
2356 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
2357 {
2358 BtPageSet set[1];
2359 uint slot, idx;
2360 BtKey ptr;
2361
2362         while( 1 ) {
2363                 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2364                         ptr = keyptr(set->page, slot);
2365                 else
2366                 {
2367                         if ( !bt->err )
2368                                 bt->err = BTERR_ovflw;
2369                         return bt->err;
2370                 }
2371
2372                 // if key already exists, update id and return
2373
2374                 if( slot <= set->page->cnt )
2375                   if( !keycmp (ptr, key, len) ) {
2376                         if( slotptr(set->page, slot)->dead )
2377                                 set->page->act++;
2378                         slotptr(set->page, slot)->dead = 0;
2379                         slotptr(set->page, slot)->tod = tod;
2380                         bt_putid(slotptr(set->page,slot)->id, id);
2381                         bt_unlockpage(BtLockWrite, set->latch);
2382                         bt_unpinlatch (set->latch);
2383                         bt_unpinpool (set->pool);
2384                         return 0;
2385                 }
2386
2387                 // check if page has enough space
2388
2389                 if( slot = bt_cleanpage (bt, set->page, len, slot) )
2390                         break;
2391
2392                 if( bt_splitpage (bt, set) )
2393                         return bt->err;
2394         }
2395
2396         // calculate next available slot and copy key into page
2397
2398         set->page->min -= len + 1; // reset lowest used offset
2399         ((unsigned char *)set->page)[set->page->min] = len;
2400         memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
2401
2402         for( idx = slot; idx <= set->page->cnt; idx++ )
2403           if( slotptr(set->page, idx)->dead )
2404                 break;
2405
2406         // now insert key into array before slot
2407
2408         if( idx > set->page->cnt )
2409                 set->page->cnt++;
2410
2411         set->page->act++;
2412
2413         while( idx > slot )
2414                 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2415
2416         bt_putid(slotptr(set->page,slot)->id, id);
2417         slotptr(set->page, slot)->off = set->page->min;
2418         slotptr(set->page, slot)->tod = tod;
2419         slotptr(set->page, slot)->dead = 0;
2420
2421         bt_unlockpage (BtLockWrite, set->latch);
2422         bt_unpinlatch (set->latch);
2423         bt_unpinpool (set->pool);
2424         return 0;
2425 }
2426
2427 //  cache page of keys into cursor and return starting slot for given key
2428
2429 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2430 {
2431 BtPageSet set[1];
2432 uint slot;
2433
2434         // cache page for retrieval
2435
2436         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2437           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2438         else
2439           return 0;
2440
2441         bt->cursor_page = set->page_no;
2442
2443         bt_unlockpage(BtLockRead, set->latch);
2444         bt_unpinlatch (set->latch);
2445         bt_unpinpool (set->pool);
2446         return slot;
2447 }
2448
2449 //  return next slot for cursor page
2450 //  or slide cursor right into next page
2451
2452 uint bt_nextkey (BtDb *bt, uint slot)
2453 {
2454 BtPageSet set[1];
2455 uid right;
2456
2457   do {
2458         right = bt_getid(bt->cursor->right);
2459         while( slot++ < bt->cursor->cnt )
2460           if( slotptr(bt->cursor,slot)->dead )
2461                 continue;
2462           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2463                 return slot;
2464           else
2465                 break;
2466
2467         if( !right )
2468                 break;
2469
2470         bt->cursor_page = right;
2471
2472         if( set->pool = bt_pinpool (bt, right) )
2473                 set->page = bt_page (bt, set->pool, right);
2474         else
2475                 return 0;
2476
2477         set->latch = bt_pinlatch (bt, right);
2478     bt_lockpage(BtLockRead, set->latch);
2479
2480         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2481
2482         bt_unlockpage(BtLockRead, set->latch);
2483         bt_unpinlatch (set->latch);
2484         bt_unpinpool (set->pool);
2485         slot = 0;
2486   } while( 1 );
2487
2488   return bt->err = 0;
2489 }
2490
2491 BtKey bt_key(BtDb *bt, uint slot)
2492 {
2493         return keyptr(bt->cursor, slot);
2494 }
2495
2496 uid bt_uid(BtDb *bt, uint slot)
2497 {
2498         return bt_getid(slotptr(bt->cursor,slot)->id);
2499 }
2500
2501 uint bt_tod(BtDb *bt, uint slot)
2502 {
2503         return slotptr(bt->cursor,slot)->tod;
2504 }
2505
2506
2507 #ifdef STANDALONE
2508
2509 void bt_latchaudit (BtDb *bt)
2510 {
2511 ushort idx, hashidx;
2512 BtLatchSet *latch;
2513 BtPool *pool;
2514 BtPage page;
2515 uid page_no;
2516
2517 #ifdef unix
2518         for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
2519                 latch = bt->mgr->latchsets + idx;
2520                 if( *(uint *)latch->readwr ) {
2521                         fprintf(stderr, "latchset %d r/w locked for page %.8x\n", idx, latch->page_no);
2522                         *(uint *)latch->readwr = 0;
2523                 }
2524                 if( *(uint *)latch->access ) {
2525                         fprintf(stderr, "latchset %d access locked for page %.8x\n", idx, latch->page_no);
2526                         *(uint *)latch->access = 0;
2527                 }
2528                 if( *(uint *)latch->parent ) {
2529                         fprintf(stderr, "latchset %d parent locked for page %.8x\n", idx, latch->page_no);
2530                         *(uint *)latch->parent = 0;
2531                 }
2532                 if( *(uint *)latch->busy ) {
2533                         fprintf(stderr, "latchset %d busy locked for page %.8x\n", idx, latch->page_no);
2534                         *(uint *)latch->parent = 0;
2535                 }
2536                 if( latch->pin ) {
2537                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2538                         latch->pin = 0;
2539                 }
2540         }
2541
2542         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2543           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2544                 latch = bt->mgr->latchsets + idx;
2545                 if( latch->hash != hashidx ) {
2546                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2547                         latch->hash = hashidx;
2548                 }
2549           } while( idx = latch->next );
2550         }
2551
2552         page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
2553
2554         while( page_no ) {
2555                 fprintf(stderr, "free: %.6x\n", (uint)page_no);
2556
2557                 if( pool = bt_pinpool (bt, page_no) )
2558                         page = bt_page (bt, pool, page_no);
2559                 else
2560                         return;
2561
2562             page_no = bt_getid(page->right);
2563                 bt_unpinpool (pool);
2564         }
2565 #endif
2566 }
2567
2568 typedef struct {
2569         char type, idx;
2570         char *infile;
2571         BtMgr *mgr;
2572         int num;
2573 } ThreadArg;
2574
2575 //  standalone program to index file of keys
2576 //  then list them onto std-out
2577
2578 #ifdef unix
2579 void *index_file (void *arg)
2580 #else
2581 uint __stdcall index_file (void *arg)
2582 #endif
2583 {
2584 int line = 0, found = 0, cnt = 0;
2585 uid next, page_no = LEAF_page;  // start on first page of leaves
2586 unsigned char key[256];
2587 ThreadArg *args = arg;
2588 int ch, len = 0, slot;
2589 BtPageSet set[1];
2590 time_t tod[1];
2591 BtKey ptr;
2592 BtDb *bt;
2593 FILE *in;
2594
2595         bt = bt_open (args->mgr);
2596         time (tod);
2597
2598         switch(args->type | 0x20)
2599         {
2600         case 'a':
2601                 fprintf(stderr, "started latch mgr audit\n");
2602                 bt_latchaudit (bt);
2603                 fprintf(stderr, "finished latch mgr audit\n");
2604                 break;
2605
2606         case 'w':
2607                 fprintf(stderr, "started indexing for %s\n", args->infile);
2608                 if( in = fopen (args->infile, "rb") )
2609                   while( ch = getc(in), ch != EOF )
2610                         if( ch == '\n' )
2611                         {
2612                           line++;
2613
2614                           if( args->num == 1 )
2615                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2616
2617                           else if( args->num )
2618                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2619
2620                           if( bt_insertkey (bt, key, len, line, *tod, 0) )
2621                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2622                           len = 0;
2623                         }
2624                         else if( len < 255 )
2625                                 key[len++] = ch;
2626                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2627                 break;
2628
2629         case 'd':
2630                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2631                 if( in = fopen (args->infile, "rb") )
2632                   while( ch = getc(in), ch != EOF )
2633                         if( ch == '\n' )
2634                         {
2635                           line++;
2636                           if( args->num == 1 )
2637                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2638
2639                           else if( args->num )
2640                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2641
2642                           if( bt_deletekey (bt, key, len) )
2643                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2644                           len = 0;
2645                         }
2646                         else if( len < 255 )
2647                                 key[len++] = ch;
2648                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2649                 break;
2650
2651         case 'f':
2652                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2653                 if( in = fopen (args->infile, "rb") )
2654                   while( ch = getc(in), ch != EOF )
2655                         if( ch == '\n' )
2656                         {
2657                           line++;
2658                           if( args->num == 1 )
2659                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2660
2661                           else if( args->num )
2662                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2663
2664                           if( bt_findkey (bt, key, len) )
2665                                 found++;
2666                           else if( bt->err )
2667                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2668                           len = 0;
2669                         }
2670                         else if( len < 255 )
2671                                 key[len++] = ch;
2672                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2673                 break;
2674
2675         case 's':
2676                 fprintf(stderr, "started scanning\n");
2677                 do {
2678                         if( set->pool = bt_pinpool (bt, page_no) )
2679                                 set->page = bt_page (bt, set->pool, page_no);
2680                         else
2681                                 break;
2682                         set->latch = bt_pinlatch (bt, page_no);
2683                         bt_lockpage (BtLockRead, set->latch);
2684                         next = bt_getid (set->page->right);
2685                         cnt += set->page->act;
2686
2687                         for( slot = 0; slot++ < set->page->cnt; )
2688                          if( next || slot < set->page->cnt )
2689                           if( !slotptr(set->page, slot)->dead ) {
2690                                 ptr = keyptr(set->page, slot);
2691                                 fwrite (ptr->key, ptr->len, 1, stdout);
2692                                 fputc ('\n', stdout);
2693                           }
2694
2695                         bt_unlockpage (BtLockRead, set->latch);
2696                         bt_unpinlatch (set->latch);
2697                         bt_unpinpool (set->pool);
2698                 } while( page_no = next );
2699
2700                 cnt--;  // remove stopper key
2701                 fprintf(stderr, " Total keys read %d\n", cnt);
2702                 break;
2703
2704         case 'c':
2705                 fprintf(stderr, "started counting\n");
2706
2707                 do {
2708                         if( set->pool = bt_pinpool (bt, page_no) )
2709                                 set->page = bt_page (bt, set->pool, page_no);
2710                         else
2711                                 break;
2712                         set->latch = bt_pinlatch (bt, page_no);
2713                         bt_lockpage (BtLockRead, set->latch);
2714                         cnt += set->page->act;
2715                         next = bt_getid (set->page->right);
2716                         bt_unlockpage (BtLockRead, set->latch);
2717                         bt_unpinlatch (set->latch);
2718                         bt_unpinpool (set->pool);
2719                 } while( page_no = next );
2720
2721                 cnt--;  // remove stopper key
2722                 fprintf(stderr, " Total keys read %d\n", cnt);
2723                 break;
2724         }
2725
2726         bt_close (bt);
2727 #ifdef unix
2728         return NULL;
2729 #else
2730         return 0;
2731 #endif
2732 }
2733
2734 typedef struct timeval timer;
2735
2736 int main (int argc, char **argv)
2737 {
2738 int idx, cnt, len, slot, err;
2739 int segsize, bits = 16;
2740 #ifdef unix
2741 pthread_t *threads;
2742 timer start, stop;
2743 #else
2744 time_t start[1], stop[1];
2745 HANDLE *threads;
2746 #endif
2747 double real_time;
2748 ThreadArg *args;
2749 uint poolsize = 0;
2750 int num = 0;
2751 char key[1];
2752 BtMgr *mgr;
2753 BtKey ptr;
2754 BtDb *bt;
2755
2756         if( argc < 3 ) {
2757                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2758                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2759                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2760                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2761                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2762                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2763                 exit(0);
2764         }
2765
2766 #ifdef unix
2767         gettimeofday(&start, NULL);
2768 #else
2769         time(start);
2770 #endif
2771
2772         if( argc > 3 )
2773                 bits = atoi(argv[3]);
2774
2775         if( argc > 4 )
2776                 poolsize = atoi(argv[4]);
2777
2778         if( !poolsize )
2779                 fprintf (stderr, "Warning: no mapped_pool\n");
2780
2781         if( poolsize > 65535 )
2782                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2783
2784         if( argc > 5 )
2785                 segsize = atoi(argv[5]);
2786         else
2787                 segsize = 4;    // 16 pages per mmap segment
2788
2789         if( argc > 6 )
2790                 num = atoi(argv[6]);
2791
2792         cnt = argc - 7;
2793 #ifdef unix
2794         threads = malloc (cnt * sizeof(pthread_t));
2795 #else
2796         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2797 #endif
2798         args = malloc (cnt * sizeof(ThreadArg));
2799
2800         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2801
2802         if( !mgr ) {
2803                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2804                 exit (1);
2805         }
2806
2807         //      fire off threads
2808
2809         for( idx = 0; idx < cnt; idx++ ) {
2810                 args[idx].infile = argv[idx + 7];
2811                 args[idx].type = argv[2][0];
2812                 args[idx].mgr = mgr;
2813                 args[idx].num = num;
2814                 args[idx].idx = idx;
2815 #ifdef unix
2816                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2817                         fprintf(stderr, "Error creating thread %d\n", err);
2818 #else
2819                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2820 #endif
2821         }
2822
2823         //      wait for termination
2824
2825 #ifdef unix
2826         for( idx = 0; idx < cnt; idx++ )
2827                 pthread_join (threads[idx], NULL);
2828         gettimeofday(&stop, NULL);
2829         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2830 #else
2831         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2832
2833         for( idx = 0; idx < cnt; idx++ )
2834                 CloseHandle(threads[idx]);
2835
2836         time (stop);
2837         real_time = 1000 * (*stop - *start);
2838 #endif
2839         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2840         bt_mgrclose (mgr);
2841 }
2842
2843 #endif  //STANDALONE