]> pd.if.org Git - btree/blob - threads2j.c
fix original problem with bt_deletekey, obviating need for btree2t.c
[btree] / threads2j.c
1 // btree version threads2j linux futex concurrency version
2 //      with reworked bt_deletekey
3 // 12 FEB 2014
4
5 // author: karl malbrain, malbrain@cal.berkeley.edu
6
7 /*
8 This work, including the source code, documentation
9 and related data, is placed into the public domain.
10
11 The orginal author is Karl Malbrain.
12
13 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
14 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
15 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
16 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
17 RESULTING FROM THE USE, MODIFICATION, OR
18 REDISTRIBUTION OF THIS SOFTWARE.
19 */
20
21 // Please see the project home page for documentation
22 // code.google.com/p/high-concurrency-btree
23
24 #define _FILE_OFFSET_BITS 64
25 #define _LARGEFILE64_SOURCE
26
27 #ifdef linux
28 #define _GNU_SOURCE
29 #include <linux/futex.h>
30 #define SYS_futex 202
31 #endif
32
33 #ifdef unix
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <fcntl.h>
38 #include <sys/time.h>
39 #include <sys/mman.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #include <limits.h>
43 #else
44 #define WIN32_LEAN_AND_MEAN
45 #include <windows.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <time.h>
49 #include <fcntl.h>
50 #include <process.h>
51 #include <intrin.h>
52 #endif
53
54 #include <memory.h>
55 #include <string.h>
56 #include <stddef.h>
57
58 typedef unsigned long long      uid;
59
60 #ifndef unix
61 typedef unsigned long long      off64_t;
62 typedef unsigned short          ushort;
63 typedef unsigned int            uint;
64 #endif
65
66 #define BT_ro 0x6f72    // ro
67 #define BT_rw 0x7772    // rw
68
69 #define BT_latchtable   128                                     // number of latch manager slots
70
71 #define BT_maxbits              24                                      // maximum page size in bits
72 #define BT_minbits              9                                       // minimum page size in bits
73 #define BT_minpage              (1 << BT_minbits)       // minimum page size
74 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
75
76 /*
77 There are five lock types for each node in three independent sets: 
78 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
79 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
80 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
81 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
82 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
83 */
84
85 typedef enum{
86         BtLockAccess,
87         BtLockDelete,
88         BtLockRead,
89         BtLockWrite,
90         BtLockParent
91 } BtLock;
92
93 //      mode & definition for latch implementation
94
95 enum {
96         Mutex = 1 << 0,         // the mutex bit
97         Write = 1 << 1,         // the writers bit
98         Share = 1 << 2,         // reader count
99         PendRd = 1 << 12,       // reader contended count
100         PendWr = 1 << 22        // writer contended count
101 } LockMode;
102
103 enum {
104         QueRd = 1,      // reader queue
105         QueWr = 2       // writer queue
106 } RWQueue;
107
108 // share is count of read accessors
109 // grant write lock when share == 0
110
111 typedef struct {
112         volatile uint mutex:1;          // 1 = busy
113         volatile uint write:1;          // 1 = exclusive
114         volatile uint share:10;         // count of readers holding locks
115         volatile uint readwait:10;      // count of readers waiting
116         volatile uint writewait:10;     // count of writers waiting
117 } BtLatch;
118
119 //      Define the length of the page and key pointers
120
121 #define BtId 6
122
123 //      Page key slot definition.
124
125 //      If BT_maxbits is 15 or less, you can save 4 bytes
126 //      for each key stored by making the first two uints
127 //      into ushorts.  You can also save 4 bytes by removing
128 //      the tod field from the key.
129
130 //      Keys are marked dead, but remain on the page until
131 //      it cleanup is called. The fence key (highest key) for
132 //      the page is always present, even after cleanup.
133
134 typedef struct {
135         uint off:BT_maxbits;            // page offset for key start
136         uint dead:1;                            // set for deleted key
137         uint tod;                                       // time-stamp for key
138         unsigned char id[BtId];         // id associated with key
139 } BtSlot;
140
141 //      The key structure occupies space at the upper end of
142 //      each page.  It's a length byte followed by the value
143 //      bytes.
144
145 typedef struct {
146         unsigned char len;
147         unsigned char key[1];
148 } *BtKey;
149
150 //      The first part of an index page.
151 //      It is immediately followed
152 //      by the BtSlot array of keys.
153
154 typedef struct BtPage_ {
155         uint cnt;                                       // count of keys in page
156         uint act;                                       // count of active keys
157         uint min;                                       // next key offset
158         unsigned char bits:7;           // page size in bits
159         unsigned char free:1;           // page is on free list
160         unsigned char lvl:4;            // level of page
161         unsigned char kill:1;           // page is being deleted
162         unsigned char dirty:1;          // page has deleted keys
163         unsigned char posted:1;         // page fence has posted
164         unsigned char goright:1;        // page is being killed, continue to right
165         unsigned char right[BtId];      // page number to right
166         unsigned char fence[256];       // page fence key
167 } *BtPage;
168
169 //  hash table entries
170
171 typedef struct {
172         BtLatch latch[1];
173         volatile ushort slot;           // Latch table entry at head of chain
174 } BtHashEntry;
175
176 //      latch manager table structure
177
178 typedef struct {
179         BtLatch readwr[1];                      // read/write page lock
180         BtLatch access[1];                      // Access Intent/Page delete
181         BtLatch parent[1];                      // adoption of foster children
182         BtLatch busy[1];                        // slot is being moved between chains
183         volatile ushort next;           // next entry in hash table chain
184         volatile ushort prev;           // prev entry in hash table chain
185         volatile ushort pin;            // number of outstanding locks
186         volatile ushort hash;           // hash slot entry is under
187         volatile uid page_no;           // latch set page number
188 } BtLatchSet;
189
190 //      The memory mapping pool table buffer manager entry
191
192 typedef struct {
193         unsigned long long int lru;     // number of times accessed
194         uid  basepage;                          // mapped base page number
195         char *map;                                      // mapped memory pointer
196         ushort slot;                            // slot index in this array
197         ushort pin;                                     // mapped page pin counter
198         void *hashprev;                         // previous pool entry for the same hash idx
199         void *hashnext;                         // next pool entry for the same hash idx
200 #ifndef unix
201         HANDLE hmap;                            // Windows memory mapping handle
202 #endif
203 } BtPool;
204
205 //  The loadpage interface object
206
207 typedef struct {
208         uid page_no;            // current page number
209         BtPage page;            // current page pointer
210         BtPool *pool;           // current page pool
211         BtLatchSet *latch;      // current page latch set
212 } BtPageSet;
213
214 //      structure for latch manager on ALLOC_page
215
216 typedef struct {
217         struct BtPage_ alloc[2];        // next & free page_nos in right ptr
218         BtLatch lock[1];                        // allocation area lite latch
219         ushort latchdeployed;           // highest number of latch entries deployed
220         ushort nlatchpage;                      // number of latch pages at BT_latch
221         ushort latchtotal;                      // number of page latch entries
222         ushort latchhash;                       // number of latch hash table slots
223         ushort latchvictim;                     // next latch entry to examine
224         BtHashEntry table[0];           // the hash table
225 } BtLatchMgr;
226
227 //      The object structure for Btree access
228
229 typedef struct {
230         uint page_size;                         // page size    
231         uint page_bits;                         // page size in bits    
232         uint seg_bits;                          // seg size in pages in bits
233         uint mode;                                      // read-write mode
234 #ifdef unix
235         int idx;
236 #else
237         HANDLE idx;
238 #endif
239         ushort poolcnt;                         // highest page pool node in use
240         ushort poolmax;                         // highest page pool node allocated
241         ushort poolmask;                        // total number of pages in mmap segment - 1
242         ushort evicted;                         // last evicted hash table slot
243         ushort hashsize;                        // size of Hash Table for pool entries
244         ushort *hash;                           // pool index for hash entries
245         BtLatch *latch;                         // latches for pool hash slots
246         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
247         BtLatchSet *latchsets;          // mapped latch set from latch pages
248         BtPool *pool;                           // memory pool page segments
249 #ifndef unix
250         HANDLE halloc;                          // allocation and latch table handle
251 #endif
252 } BtMgr;
253
254 typedef struct {
255         BtMgr *mgr;                     // buffer manager for thread
256         BtPage cursor;          // cached frame for start/next (never mapped)
257         BtPage frame;           // spare frame for the page split (never mapped)
258         BtPage zero;            // page of zeroes to extend the file (never mapped)
259         uid cursor_page;        // current cursor page number   
260         unsigned char *mem;     // frame, cursor, page memory buffer
261         int found;                      // last delete or insert was found
262         int err;                        // last error
263 } BtDb;
264
265 typedef enum {
266         BTERR_ok = 0,
267         BTERR_struct,
268         BTERR_ovflw,
269         BTERR_lock,
270         BTERR_map,
271         BTERR_wrt,
272         BTERR_hash
273 } BTERR;
274
275 // B-Tree functions
276 extern void bt_close (BtDb *bt);
277 extern BtDb *bt_open (BtMgr *mgr);
278 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl);
279 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len);
280 extern uid bt_findkey    (BtDb *bt, unsigned char *key, uint len);
281 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
282 extern uint bt_nextkey   (BtDb *bt, uint slot);
283
284 //      internal functions
285 BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence);
286
287 //      manager functions
288 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
289 void bt_mgrclose (BtMgr *mgr);
290
291 //  Helper functions to return slot values
292
293 extern BtKey bt_key (BtDb *bt, uint slot);
294 extern uid bt_uid (BtDb *bt, uint slot);
295 extern uint bt_tod (BtDb *bt, uint slot);
296
297 //  BTree page number constants
298 #define ALLOC_page              0       // allocation & lock manager hash table
299 #define ROOT_page               1       // root of the btree
300 #define LEAF_page               2       // first page of leaves
301 #define LATCH_page              3       // pages for lock manager
302
303 //      Number of levels to create in a new BTree
304
305 #define MIN_lvl                 2
306
307 //  The page is allocated from low and hi ends.
308 //  The key offsets and row-id's are allocated
309 //  from the bottom, while the text of the key
310 //  is allocated from the top.  When the two
311 //  areas meet, the page is split into two.
312
313 //  A key consists of a length byte, two bytes of
314 //  index number (0 - 65534), and up to 253 bytes
315 //  of key value.  Duplicate keys are discarded.
316 //  Associated with each key is a 48 bit row-id.
317
318 //  The b-tree root is always located at page 1.
319 //      The first leaf page of level zero is always
320 //      located on page 2.
321
322 //      The b-tree pages are linked with next
323 //      pointers to facilitate enumerators,
324 //      and provide for concurrency.
325
326 //      When to root page fills, it is split in two and
327 //      the tree height is raised by a new root at page
328 //      one with two keys.
329
330 //      Deleted keys are marked with a dead bit until
331 //      page cleanup. The fence key for a node is
332 //      present in a special array.
333
334 //  Groups of pages called segments from the btree are optionally
335 //  cached with a memory mapped pool. A hash table is used to keep
336 //  track of the cached segments.  This behaviour is controlled
337 //  by the cache block size parameter to bt_open.
338
339 //  To achieve maximum concurrency one page is locked at a time
340 //  as the tree is traversed to find leaf key in question. The right
341 //  page numbers are used in cases where the page is being split,
342 //      or consolidated.
343
344 //  Page 0 is dedicated to lock for new page extensions,
345 //      and chains empty pages together for reuse.
346
347 //      The ParentModification lock on a node is obtained to serialize posting
348 //      or changing the fence key for a node.
349
350 //      Empty pages are chained together through the ALLOC page and reused.
351
352 //      Access macros to address slot and key values from the page
353 //      Page slots use 1 based indexing.
354
355 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
356 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
357
358 void bt_putid(unsigned char *dest, uid id)
359 {
360 int i = BtId;
361
362         while( i-- )
363                 dest[i] = (unsigned char)id, id >>= 8;
364 }
365
366 uid bt_getid(unsigned char *src)
367 {
368 uid id = 0;
369 int i;
370
371         for( i = 0; i < BtId; i++ )
372                 id <<= 8, id |= *src++; 
373
374         return id;
375 }
376
377 //      Latch Manager
378
379 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
380 {
381         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
382 }
383
384 //      wait until write lock mode is clear
385 //      and add 1 to the share count
386
387 void bt_spinreadlock(BtLatch *latch, int private)
388 {
389 uint prev;
390
391   if( private )
392         private = FUTEX_PRIVATE_FLAG;
393
394   while( 1 ) {
395         //      obtain latch mutex
396         if( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex ) {
397                 sched_yield();
398                 continue;
399         }
400
401         //  wait for writers to clear
402         //      increment read waiters and wait
403
404         if( latch->write || latch->writewait ) {
405                 __sync_fetch_and_add ((uint *)latch, PendRd);
406                 prev = __sync_fetch_and_and ((uint *)latch, ~Mutex) & ~Mutex;
407                 sys_futex( (uint *)latch, FUTEX_WAIT_BITSET | private, prev, NULL, NULL, QueRd );
408                 __sync_fetch_and_sub ((uint *)latch, PendRd);
409                 continue;
410         }
411         
412         // increment reader lock count
413         // and release latch mutex
414
415         __sync_fetch_and_add ((uint *)latch, Share);
416         __sync_fetch_and_and ((uint *)latch, ~Mutex);
417         return;
418   }
419 }
420
421 //      wait for other read and write latches to relinquish
422
423 void bt_spinwritelock(BtLatch *latch, int private)
424 {
425 uint prev;
426
427   if( private )
428         private = FUTEX_PRIVATE_FLAG;
429
430   while( 1 ) {
431         //      obtain latch mutex
432         if( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex ) {
433                 sched_yield();
434                 continue;
435         }
436
437         //      wait for write and reader count to clear
438
439         if( latch->write || latch->share ) {
440                 __sync_fetch_and_add ((uint *)latch, PendWr);
441                 prev = __sync_fetch_and_and ((uint *)latch, ~Mutex) & ~Mutex;
442                 sys_futex( (uint *)latch, FUTEX_WAIT_BITSET | private, prev, NULL, NULL, QueWr );
443                 __sync_fetch_and_sub ((uint *)latch, PendWr);
444                 continue;
445         }
446         
447         //      take write mutex
448         //      release latch mutex
449
450         __sync_fetch_and_or ((uint *)latch, Write);
451         __sync_fetch_and_and ((uint *)latch, ~Mutex);
452         return;
453   }
454 }
455
456 //      try to obtain write lock
457
458 //      return 1 if obtained,
459 //              0 otherwise
460
461 int bt_spinwritetry(BtLatch *latch)
462 {
463 int ans;
464
465         //      try for mutex,
466         //      abandon request if not taken
467
468         if( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex )
469                 return 0;
470
471         //      see if write mode is available
472
473         if( !latch->write && !latch->share ) {
474                 __sync_fetch_and_or ((uint *)latch, Write);
475                 ans = 1;
476         } else
477                 ans = 0;
478
479         // release latch mutex
480
481         __sync_fetch_and_and ((uint *)latch, ~Mutex);
482         return ans;
483 }
484
485 //      clear write lock
486
487 void bt_spinreleasewrite(BtLatch *latch, int private)
488 {
489   if( private )
490         private = FUTEX_PRIVATE_FLAG;
491
492         //      obtain latch mutex
493
494         while( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex )
495                 sched_yield();
496
497         __sync_fetch_and_and ((uint *)latch, ~Write);
498
499         // favor writers
500
501         if( latch->writewait )
502           if( sys_futex( (uint *)latch, FUTEX_WAKE_BITSET | private, 1, NULL, NULL, QueWr ) )
503                 goto wakexit;
504
505         if( latch->readwait )
506                 sys_futex( (uint *)latch, FUTEX_WAKE_BITSET | private, INT_MAX, NULL, NULL, QueRd );
507
508         // release latch mutex
509
510 wakexit:
511         __sync_fetch_and_and ((uint *)latch, ~Mutex);
512 }
513
514 //      decrement reader count
515
516 void bt_spinreleaseread(BtLatch *latch, int private)
517 {
518   if( private )
519         private = FUTEX_PRIVATE_FLAG;
520
521         //      obtain latch mutex
522
523         while( __sync_fetch_and_or((uint *)latch, Mutex) & Mutex )
524                 sched_yield();
525
526         __sync_fetch_and_sub ((uint *)latch, Share);
527
528         // wake waiting writers
529
530         if( !latch->share && latch->writewait )
531                 sys_futex( (uint *)latch, FUTEX_WAKE_BITSET | private, 1, NULL, NULL, QueWr );
532
533         // release latch mutex
534
535         __sync_fetch_and_and ((uint *)latch, ~Mutex);
536 }
537
538 //      link latch table entry into latch hash table
539
540 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
541 {
542 BtLatchSet *set = bt->mgr->latchsets + victim;
543
544         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
545                 bt->mgr->latchsets[set->next].prev = victim;
546
547         bt->mgr->latchmgr->table[hashidx].slot = victim;
548         set->page_no = page_no;
549         set->hash = hashidx;
550         set->prev = 0;
551 }
552
553 //      release latch pin
554
555 void bt_unpinlatch (BtLatchSet *set)
556 {
557 #ifdef unix
558         __sync_fetch_and_add(&set->pin, -1);
559 #else
560         _InterlockedDecrement16 (&set->pin);
561 #endif
562 }
563
564 //      find existing latchset or inspire new one
565 //      return with latchset pinned
566
567 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
568 {
569 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
570 ushort slot, avail = 0, victim, idx;
571 BtLatchSet *set;
572
573         //  obtain read lock on hash table entry
574
575         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch, 0);
576
577         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
578         {
579                 set = bt->mgr->latchsets + slot;
580                 if( page_no == set->page_no )
581                         break;
582         } while( slot = set->next );
583
584         if( slot ) {
585 #ifdef unix
586                 __sync_fetch_and_add(&set->pin, 1);
587 #else
588                 _InterlockedIncrement16 (&set->pin);
589 #endif
590         }
591
592     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch, 0);
593
594         if( slot )
595                 return set;
596
597   //  try again, this time with write lock
598
599   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch, 0);
600
601   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
602   {
603         set = bt->mgr->latchsets + slot;
604         if( page_no == set->page_no )
605                 break;
606         if( !set->pin && !avail )
607                 avail = slot;
608   } while( slot = set->next );
609
610   //  found our entry, or take over an unpinned one
611
612   if( slot || (slot = avail) ) {
613         set = bt->mgr->latchsets + slot;
614 #ifdef unix
615         __sync_fetch_and_add(&set->pin, 1);
616 #else
617         _InterlockedIncrement16 (&set->pin);
618 #endif
619         set->page_no = page_no;
620         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch, 0);
621         return set;
622   }
623
624         //  see if there are any unused entries
625 #ifdef unix
626         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
627 #else
628         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
629 #endif
630
631         if( victim < bt->mgr->latchmgr->latchtotal ) {
632                 set = bt->mgr->latchsets + victim;
633 #ifdef unix
634                 __sync_fetch_and_add(&set->pin, 1);
635 #else
636                 _InterlockedIncrement16 (&set->pin);
637 #endif
638                 bt_latchlink (bt, hashidx, victim, page_no);
639                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch, 0);
640                 return set;
641         }
642
643 #ifdef unix
644         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
645 #else
646         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
647 #endif
648   //  find and reuse previous lock entry
649
650   while( 1 ) {
651 #ifdef unix
652         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
653 #else
654         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
655 #endif
656         //      we don't use slot zero
657
658         if( victim %= bt->mgr->latchmgr->latchtotal )
659                 set = bt->mgr->latchsets + victim;
660         else
661                 continue;
662
663         //      take control of our slot
664         //      from other threads
665
666         if( set->pin || !bt_spinwritetry (set->busy) )
667                 continue;
668
669         idx = set->hash;
670
671         // try to get write lock on hash chain
672         //      skip entry if not obtained
673         //      or has outstanding locks
674
675         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
676                 bt_spinreleasewrite (set->busy, 0);
677                 continue;
678         }
679
680         if( set->pin ) {
681                 bt_spinreleasewrite (set->busy, 0);
682                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch, 0);
683                 continue;
684         }
685
686         //  unlink our available victim from its hash chain
687
688         if( set->prev )
689                 bt->mgr->latchsets[set->prev].next = set->next;
690         else
691                 bt->mgr->latchmgr->table[idx].slot = set->next;
692
693         if( set->next )
694                 bt->mgr->latchsets[set->next].prev = set->prev;
695
696         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch, 0);
697 #ifdef unix
698         __sync_fetch_and_add(&set->pin, 1);
699 #else
700         _InterlockedIncrement16 (&set->pin);
701 #endif
702         bt_latchlink (bt, hashidx, victim, page_no);
703         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch, 0);
704         bt_spinreleasewrite (set->busy, 0);
705         return set;
706   }
707 }
708
709 void bt_mgrclose (BtMgr *mgr)
710 {
711 BtPool *pool;
712 uint slot;
713
714         // release mapped pages
715         //      note that slot zero is never used
716
717         for( slot = 1; slot < mgr->poolmax; slot++ ) {
718                 pool = mgr->pool + slot;
719                 if( pool->slot )
720 #ifdef unix
721                         munmap (pool->map, (mgr->poolmask+1) << mgr->page_bits);
722 #else
723                 {
724                         FlushViewOfFile(pool->map, 0);
725                         UnmapViewOfFile(pool->map);
726                         CloseHandle(pool->hmap);
727                 }
728 #endif
729         }
730
731 #ifdef unix
732         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
733         munmap (mgr->latchmgr, mgr->page_size);
734 #else
735         FlushViewOfFile(mgr->latchmgr, 0);
736         UnmapViewOfFile(mgr->latchmgr);
737         CloseHandle(mgr->halloc);
738 #endif
739 #ifdef unix
740         close (mgr->idx);
741         free (mgr->pool);
742         free (mgr->hash);
743         free (mgr->latch);
744         free (mgr);
745 #else
746         FlushFileBuffers(mgr->idx);
747         CloseHandle(mgr->idx);
748         GlobalFree (mgr->pool);
749         GlobalFree (mgr->hash);
750         GlobalFree (mgr->latch);
751         GlobalFree (mgr);
752 #endif
753 }
754
755 //      close and release memory
756
757 void bt_close (BtDb *bt)
758 {
759 #ifdef unix
760         if ( bt->mem )
761                 free (bt->mem);
762 #else
763         if ( bt->mem)
764                 VirtualFree (bt->mem, 0, MEM_RELEASE);
765 #endif
766         free (bt);
767 }
768
769 //  open/create new btree buffer manager
770
771 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
772 //              size of mapped page pool (e.g. 8192)
773
774 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
775 {
776 uint lvl, attr, cacheblk, last, slot, idx;
777 uint nlatchpage, latchhash;
778 BtLatchMgr *latchmgr;
779 off64_t size;
780 uint amt[1];
781 BtMgr* mgr;
782 BtKey key;
783 int flag;
784 #ifndef unix
785 SYSTEM_INFO sysinfo[1];
786 #endif
787
788         // determine sanity of page size and buffer pool
789
790         if( bits > BT_maxbits )
791                 bits = BT_maxbits;
792         else if( bits < BT_minbits )
793                 bits = BT_minbits;
794
795         if( !poolmax )
796                 return NULL;    // must have buffer pool
797
798 #ifdef unix
799         mgr = calloc (1, sizeof(BtMgr));
800         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
801
802         if( mgr->idx == -1 )
803                 return free(mgr), NULL;
804         
805         cacheblk = 4096;        // minimum mmap segment size for unix
806
807 #else
808         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
809         attr = FILE_ATTRIBUTE_NORMAL;
810         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
811
812         if( mgr->idx == INVALID_HANDLE_VALUE )
813                 return GlobalFree(mgr), NULL;
814
815         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
816         GetSystemInfo(sysinfo);
817         cacheblk = sysinfo->dwAllocationGranularity;
818 #endif
819
820 #ifdef unix
821         latchmgr = malloc (BT_maxpage);
822         *amt = 0;
823
824         // read minimum page size to get root info
825
826         if( size = lseek (mgr->idx, 0L, 2) ) {
827                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
828                         bits = latchmgr->alloc->bits;
829                 else
830                         return free(mgr), free(latchmgr), NULL;
831         } else if( mode == BT_ro )
832                 return free(latchmgr), free (mgr), NULL;
833 #else
834         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
835         size = GetFileSize(mgr->idx, amt);
836
837         if( size || *amt ) {
838                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
839                         return bt_mgrclose (mgr), NULL;
840                 bits = latchmgr->alloc->bits;
841         } else if( mode == BT_ro )
842                 return bt_mgrclose (mgr), NULL;
843 #endif
844
845         mgr->page_size = 1 << bits;
846         mgr->page_bits = bits;
847
848         mgr->poolmax = poolmax;
849         mgr->mode = mode;
850
851         if( cacheblk < mgr->page_size )
852                 cacheblk = mgr->page_size;
853
854         //  mask for partial memmaps
855
856         mgr->poolmask = (cacheblk >> bits) - 1;
857
858         //      see if requested size of pages per memmap is greater
859
860         if( (1 << segsize) > mgr->poolmask )
861                 mgr->poolmask = (1 << segsize) - 1;
862
863         mgr->seg_bits = 0;
864
865         while( (1 << mgr->seg_bits) <= mgr->poolmask )
866                 mgr->seg_bits++;
867
868         mgr->hashsize = hashsize;
869
870 #ifdef unix
871         mgr->pool = calloc (poolmax, sizeof(BtPool));
872         mgr->hash = calloc (hashsize, sizeof(ushort));
873         mgr->latch = calloc (hashsize, sizeof(BtLatch));
874 #else
875         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
876         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
877         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtLatch));
878 #endif
879
880         if( size || *amt )
881                 goto mgrlatch;
882
883         // initialize an empty b-tree with latch page, root page, page of leaves
884         // and page(s) of latches
885
886         memset (latchmgr, 0, 1 << bits);
887         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
888         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
889         latchmgr->alloc->bits = mgr->page_bits;
890
891         latchmgr->nlatchpage = nlatchpage;
892         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
893
894         //  initialize latch manager
895
896         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
897
898         //      size of hash table = total number of latchsets
899
900         if( latchhash > latchmgr->latchtotal )
901                 latchhash = latchmgr->latchtotal;
902
903         latchmgr->latchhash = latchhash;
904
905 #ifdef unix
906         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
907                 return bt_mgrclose (mgr), NULL;
908 #else
909         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
910                 return bt_mgrclose (mgr), NULL;
911
912         if( *amt < mgr->page_size )
913                 return bt_mgrclose (mgr), NULL;
914 #endif
915
916         memset (latchmgr, 0, 1 << bits);
917         latchmgr->alloc->bits = mgr->page_bits;
918
919         for( lvl=MIN_lvl; lvl--; ) {
920                 slotptr(latchmgr->alloc, 1)->off = offsetof(struct BtPage_, fence);
921                 bt_putid(slotptr(latchmgr->alloc, 1)->id, lvl ? MIN_lvl - lvl + 1 : 0);         // next(lower) page number
922                 latchmgr->alloc->fence[0] = 2;                  // create stopper key
923                 latchmgr->alloc->fence[1] = 0xff;
924                 latchmgr->alloc->fence[2] = 0xff;
925                 latchmgr->alloc->min = mgr->page_size;
926                 latchmgr->alloc->lvl = lvl;
927                 latchmgr->alloc->cnt = 1;
928                 latchmgr->alloc->act = 1;
929 #ifdef unix
930                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
931                         return bt_mgrclose (mgr), NULL;
932 #else
933                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
934                         return bt_mgrclose (mgr), NULL;
935
936                 if( *amt < mgr->page_size )
937                         return bt_mgrclose (mgr), NULL;
938 #endif
939         }
940
941         // clear out latch manager locks
942         //      and rest of pages to round out segment
943
944         memset(latchmgr, 0, mgr->page_size);
945         last = MIN_lvl + 1;
946
947         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
948 #ifdef unix
949                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
950 #else
951                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
952                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
953                         return bt_mgrclose (mgr), NULL;
954                 if( *amt < mgr->page_size )
955                         return bt_mgrclose (mgr), NULL;
956 #endif
957                 last++;
958         }
959
960 mgrlatch:
961 #ifdef unix
962         flag = PROT_READ | PROT_WRITE;
963         mgr->latchmgr = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
964         if( mgr->latchmgr == MAP_FAILED )
965                 return bt_mgrclose (mgr), NULL;
966         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
967         if( mgr->latchsets == MAP_FAILED )
968                 return bt_mgrclose (mgr), NULL;
969 #else
970         flag = PAGE_READWRITE;
971         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
972         if( !mgr->halloc )
973                 return bt_mgrclose (mgr), NULL;
974
975         flag = FILE_MAP_WRITE;
976         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
977         if( !mgr->latchmgr )
978                 return GetLastError(), bt_mgrclose (mgr), NULL;
979
980         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
981 #endif
982
983 #ifdef unix
984         free (latchmgr);
985 #else
986         VirtualFree (latchmgr, 0, MEM_RELEASE);
987 #endif
988         return mgr;
989 }
990
991 //      open BTree access method
992 //      based on buffer manager
993
994 BtDb *bt_open (BtMgr *mgr)
995 {
996 BtDb *bt = malloc (sizeof(*bt));
997
998         memset (bt, 0, sizeof(*bt));
999         bt->mgr = mgr;
1000 #ifdef unix
1001         bt->mem = malloc (3 *mgr->page_size);
1002 #else
1003         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1004 #endif
1005         bt->frame = (BtPage)bt->mem;
1006         bt->zero = (BtPage)(bt->mem + 1 * mgr->page_size);
1007         bt->cursor = (BtPage)(bt->mem + 2 * mgr->page_size);
1008
1009         memset (bt->zero, 0, mgr->page_size);
1010         return bt;
1011 }
1012
1013 //  compare two keys, returning > 0, = 0, or < 0
1014 //  as the comparison value
1015
1016 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1017 {
1018 uint len1 = key1->len;
1019 int ans;
1020
1021         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1022                 return ans;
1023
1024         if( len1 > len2 )
1025                 return 1;
1026         if( len1 < len2 )
1027                 return -1;
1028
1029         return 0;
1030 }
1031
1032 //      Buffer Pool mgr
1033
1034 // find segment in pool
1035 // must be called with hashslot idx locked
1036 //      return NULL if not there
1037 //      otherwise return node
1038
1039 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1040 {
1041 BtPool *pool;
1042 uint slot;
1043
1044         // compute start of hash chain in pool
1045
1046         if( slot = bt->mgr->hash[idx] ) 
1047                 pool = bt->mgr->pool + slot;
1048         else
1049                 return NULL;
1050
1051         page_no &= ~bt->mgr->poolmask;
1052
1053         while( pool->basepage != page_no )
1054           if( pool = pool->hashnext )
1055                 continue;
1056           else
1057                 return NULL;
1058
1059         return pool;
1060 }
1061
1062 // add segment to hash table
1063
1064 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1065 {
1066 BtPool *node;
1067 uint slot;
1068
1069         pool->hashprev = pool->hashnext = NULL;
1070         pool->basepage = page_no & ~bt->mgr->poolmask;
1071         pool->lru = 1;
1072
1073         if( slot = bt->mgr->hash[idx] ) {
1074                 node = bt->mgr->pool + slot;
1075                 pool->hashnext = node;
1076                 node->hashprev = pool;
1077         }
1078
1079         bt->mgr->hash[idx] = pool->slot;
1080 }
1081
1082 //      find best segment to evict from buffer pool
1083
1084 BtPool *bt_findlru (BtDb *bt, uint hashslot)
1085 {
1086 unsigned long long int target = ~0LL;
1087 BtPool *pool = NULL, *node;
1088
1089         if( !hashslot )
1090                 return NULL;
1091
1092         node = bt->mgr->pool + hashslot;
1093
1094         //  scan pool entries under hash table slot
1095
1096         do {
1097           if( node->pin )
1098                 continue;
1099           if( node->lru > target )
1100                 continue;
1101           target = node->lru;
1102           pool = node;
1103         } while( node = node->hashnext );
1104
1105         return pool;
1106 }
1107
1108 //  map new buffer pool segment to virtual memory
1109
1110 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1111 {
1112 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1113 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1114 int flag;
1115
1116 #ifdef unix
1117         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1118         pool->map = mmap (0, (bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED | MAP_POPULATE, bt->mgr->idx, off);
1119         if( pool->map == MAP_FAILED )
1120                 return bt->err = BTERR_map;
1121
1122 #else
1123         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1124         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1125         if( !pool->hmap )
1126                 return bt->err = BTERR_map;
1127
1128         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1129         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1130         if( !pool->map )
1131                 return bt->err = BTERR_map;
1132 #endif
1133         return bt->err = 0;
1134 }
1135
1136 //      calculate page within pool
1137
1138 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1139 {
1140 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1141 BtPage page;
1142
1143         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1144         return page;
1145 }
1146
1147 //  release pool pin
1148
1149 void bt_unpinpool (BtPool *pool)
1150 {
1151 #ifdef unix
1152         __sync_fetch_and_add(&pool->pin, -1);
1153 #else
1154         _InterlockedDecrement16 (&pool->pin);
1155 #endif
1156 }
1157
1158 //      find or place requested page in segment-pool
1159 //      return pool table entry, incrementing pin
1160
1161 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1162 {
1163 BtPool *pool, *node, *next;
1164 uint slot, idx, victim;
1165
1166         //      lock hash table chain
1167
1168         idx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1169         bt_spinreadlock (&bt->mgr->latch[idx], 1);
1170
1171         //      look up in hash table
1172
1173         if( pool = bt_findpool(bt, page_no, idx) ) {
1174 #ifdef unix
1175                 __sync_fetch_and_add(&pool->pin, 1);
1176 #else
1177                 _InterlockedIncrement16 (&pool->pin);
1178 #endif
1179                 bt_spinreleaseread (&bt->mgr->latch[idx], 1);
1180                 pool->lru++;
1181                 return pool;
1182         }
1183
1184         //      upgrade to write lock
1185
1186         bt_spinreleaseread (&bt->mgr->latch[idx], 1);
1187         bt_spinwritelock (&bt->mgr->latch[idx], 1);
1188
1189         // try to find page in pool with write lock
1190
1191         if( pool = bt_findpool(bt, page_no, idx) ) {
1192 #ifdef unix
1193                 __sync_fetch_and_add(&pool->pin, 1);
1194 #else
1195                 _InterlockedIncrement16 (&pool->pin);
1196 #endif
1197                 bt_spinreleasewrite (&bt->mgr->latch[idx], 1);
1198                 pool->lru++;
1199                 return pool;
1200         }
1201
1202         // allocate a new pool node
1203         // and add to hash table
1204
1205 #ifdef unix
1206         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1207 #else
1208         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1209 #endif
1210
1211         if( ++slot < bt->mgr->poolmax ) {
1212                 pool = bt->mgr->pool + slot;
1213                 pool->slot = slot;
1214
1215                 if( bt_mapsegment(bt, pool, page_no) )
1216                         return NULL;
1217
1218                 bt_linkhash(bt, pool, page_no, idx);
1219 #ifdef unix
1220                 __sync_fetch_and_add(&pool->pin, 1);
1221 #else
1222                 _InterlockedIncrement16 (&pool->pin);
1223 #endif
1224                 bt_spinreleasewrite (&bt->mgr->latch[idx], 1);
1225                 return pool;
1226         }
1227
1228         // pool table is full
1229         //      find best pool entry to evict
1230
1231 #ifdef unix
1232         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1233 #else
1234         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1235 #endif
1236
1237         while( 1 ) {
1238 #ifdef unix
1239                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1240 #else
1241                 victim = _InterlockedIncrement16 (&bt->mgr->evicted) - 1;
1242 #endif
1243                 victim %= bt->mgr->hashsize;
1244
1245                 // try to get write lock
1246                 //      skip entry if not obtained
1247
1248                 if( !bt_spinwritetry (&bt->mgr->latch[victim]) )
1249                         continue;
1250
1251                 //  if pool entry is empty
1252                 //      or any pages are pinned
1253                 //      skip this entry
1254
1255                 if( !(pool = bt_findlru(bt, bt->mgr->hash[victim])) ) {
1256                         bt_spinreleasewrite (&bt->mgr->latch[victim], 1);
1257                         continue;
1258                 }
1259
1260                 // unlink victim pool node from hash table
1261
1262                 if( node = pool->hashprev )
1263                         node->hashnext = pool->hashnext;
1264                 else if( node = pool->hashnext )
1265                         bt->mgr->hash[victim] = node->slot;
1266                 else
1267                         bt->mgr->hash[victim] = 0;
1268
1269                 if( node = pool->hashnext )
1270                         node->hashprev = pool->hashprev;
1271
1272                 bt_spinreleasewrite (&bt->mgr->latch[victim], 1);
1273
1274                 //      remove old file mapping
1275 #ifdef unix
1276                 munmap (pool->map, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1277 #else
1278                 FlushViewOfFile(pool->map, 0);
1279                 UnmapViewOfFile(pool->map);
1280                 CloseHandle(pool->hmap);
1281 #endif
1282                 pool->map = NULL;
1283
1284                 //  create new pool mapping
1285                 //  and link into hash table
1286
1287                 if( bt_mapsegment(bt, pool, page_no) )
1288                         return NULL;
1289
1290                 bt_linkhash(bt, pool, page_no, idx);
1291 #ifdef unix
1292                 __sync_fetch_and_add(&pool->pin, 1);
1293 #else
1294                 _InterlockedIncrement16 (&pool->pin);
1295 #endif
1296                 bt_spinreleasewrite (&bt->mgr->latch[idx], 1);
1297                 return pool;
1298         }
1299 }
1300
1301 // place write, read, or parent lock on requested page_no.
1302
1303 void bt_lockpage(BtLock mode, BtLatchSet *set)
1304 {
1305         switch( mode ) {
1306         case BtLockRead:
1307                 bt_spinreadlock (set->readwr, 0);
1308                 break;
1309         case BtLockWrite:
1310                 bt_spinwritelock (set->readwr, 0);
1311                 break;
1312         case BtLockAccess:
1313                 bt_spinreadlock (set->access, 0);
1314                 break;
1315         case BtLockDelete:
1316                 bt_spinwritelock (set->access, 0);
1317                 break;
1318         case BtLockParent:
1319                 bt_spinwritelock (set->parent, 0);
1320                 break;
1321         }
1322 }
1323
1324 // remove write, read, or parent lock on requested page
1325
1326 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1327 {
1328         switch( mode ) {
1329         case BtLockRead:
1330                 bt_spinreleaseread (set->readwr, 0);
1331                 break;
1332         case BtLockWrite:
1333                 bt_spinreleasewrite (set->readwr, 0);
1334                 break;
1335         case BtLockAccess:
1336                 bt_spinreleaseread (set->access, 0);
1337                 break;
1338         case BtLockDelete:
1339                 bt_spinreleasewrite (set->access, 0);
1340                 break;
1341         case BtLockParent:
1342                 bt_spinreleasewrite (set->parent, 0);
1343                 break;
1344         }
1345 }
1346
1347 //      allocate a new page and write page into it
1348
1349 uid bt_newpage(BtDb *bt, BtPage page)
1350 {
1351 BtPageSet set[1];
1352 uid new_page;
1353 int reuse;
1354
1355         //      lock allocation page
1356
1357         bt_spinwritelock(bt->mgr->latchmgr->lock, 0);
1358
1359         // use empty chain first
1360         // else allocate empty page
1361
1362         if( new_page = bt_getid(bt->mgr->latchmgr->alloc[1].right) ) {
1363                 if( set->pool = bt_pinpool (bt, new_page) )
1364                         set->page = bt_page (bt, set->pool, new_page);
1365                 else
1366                         return 0;
1367
1368                 bt_putid(bt->mgr->latchmgr->alloc[1].right, bt_getid(set->page->right));
1369                 bt_unpinpool (set->pool);
1370                 reuse = 1;
1371         } else {
1372                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1373                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1374                 reuse = 0;
1375         }
1376 #ifdef unix
1377         if ( pwrite(bt->mgr->idx, page, bt->mgr->page_size, new_page << bt->mgr->page_bits) < bt->mgr->page_size )
1378                 return bt->err = BTERR_wrt, 0;
1379
1380         // if writing first page of pool block, zero last page in the block
1381
1382         if ( !reuse && bt->mgr->poolmask > 0 && (new_page & bt->mgr->poolmask) == 0 )
1383         {
1384                 // use zero buffer to write zeros
1385                 if ( pwrite(bt->mgr->idx,bt->zero, bt->mgr->page_size, (new_page | bt->mgr->poolmask) << bt->mgr->page_bits) < bt->mgr->page_size )
1386                         return bt->err = BTERR_wrt, 0;
1387         }
1388 #else
1389         //      bring new page into pool and copy page.
1390         //      this will extend the file into the new pages.
1391
1392         if( set->pool = bt_pinpool (bt, new_page) )
1393                 set->page = bt_page (bt, set->pool, new_page);
1394         else
1395                 return 0;
1396
1397         memcpy(set->page, page, bt->mgr->page_size);
1398         bt_unpinpool (set->pool);
1399 #endif
1400         // unlock allocation latch and return new page no
1401
1402         bt_spinreleasewrite(bt->mgr->latchmgr->lock, 0);
1403         return new_page;
1404 }
1405
1406 //  find slot in page for given key at a given level
1407
1408 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1409 {
1410 uint diff, higher = set->page->cnt, low = 1, slot;
1411
1412         //        make stopper key an infinite fence value
1413
1414         if( bt_getid (set->page->right) )
1415                 higher++;
1416
1417         //      low is the lowest candidate.
1418         //  loop ends when they meet
1419
1420         //  higher is already
1421         //      tested as .ge. the given key.
1422
1423         while( diff = higher - low ) {
1424                 slot = low + ( diff >> 1 );
1425                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1426                         low = slot + 1;
1427                 else
1428                         higher = slot;
1429         }
1430
1431         if( higher <= set->page->cnt )
1432                 return higher;
1433
1434         //      if leaf page, compare against fence value
1435
1436         //      return zero if key is on right link page
1437         //      or return slot beyond last key
1438
1439         if( set->page->lvl || keycmp ((BtKey)set->page->fence, key, len) < 0 )
1440                 return 0;
1441
1442         return higher;
1443 }
1444
1445 //  find and load page at given level for given key
1446 //      leave page rd or wr locked as requested
1447
1448 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, uint lock)
1449 {
1450 uid page_no = ROOT_page, prevpage = 0;
1451 uint drill = 0xff, slot;
1452 BtLatchSet *prevlatch;
1453 uint mode, prevmode;
1454 BtPool *prevpool;
1455
1456   //  start at root of btree and drill down
1457
1458   do {
1459         // determine lock mode of drill level
1460         mode = (lock == BtLockWrite) && (drill == lvl) ? BtLockWrite : BtLockRead; 
1461
1462         set->latch = bt_pinlatch (bt, page_no);
1463         set->page_no = page_no;
1464
1465         // pin page contents
1466
1467         if( set->pool = bt_pinpool (bt, page_no) )
1468                 set->page = bt_page (bt, set->pool, page_no);
1469         else
1470                 return 0;
1471
1472         // obtain access lock using lock chaining with Access mode
1473
1474         if( page_no > ROOT_page )
1475           bt_lockpage(BtLockAccess, set->latch);
1476
1477         //      release & unpin parent page
1478
1479         if( prevpage ) {
1480           bt_unlockpage(prevmode, prevlatch);
1481           bt_unpinlatch (prevlatch);
1482           bt_unpinpool (prevpool);
1483           prevpage = 0;
1484         }
1485
1486         // obtain read lock using lock chaining
1487
1488         bt_lockpage(mode, set->latch);
1489
1490         if( page_no > ROOT_page )
1491           bt_unlockpage(BtLockAccess, set->latch);
1492
1493         // re-read and re-lock root after determining actual level of root
1494
1495         if( set->page->lvl != drill) {
1496                 if ( set->page_no != ROOT_page )
1497                         return bt->err = BTERR_struct, 0;
1498                         
1499                 drill = set->page->lvl;
1500
1501                 if( lock == BtLockWrite && drill == lvl ) {
1502                   bt_unlockpage(mode, set->latch);
1503                   bt_unpinlatch (set->latch);
1504                   bt_unpinpool (set->pool);
1505                   continue;
1506                 }
1507         }
1508
1509         prevpage = set->page_no;
1510         prevlatch = set->latch;
1511         prevpool = set->pool;
1512         prevmode = mode;
1513
1514         //      if page is being deleted and we should continue right
1515
1516         if( set->page->kill && set->page->goright ) {
1517                 page_no = bt_getid (set->page->right);
1518                 continue;
1519         }
1520
1521         //      otherwise, wait for deleted node to clear
1522
1523         if( set->page->kill ) {
1524                 bt_unlockpage(mode, set->latch);
1525                 bt_unpinlatch (set->latch);
1526                 bt_unpinpool (set->pool);
1527                 page_no = ROOT_page;
1528                 prevpage = 0;
1529                 drill = 0xff;
1530 #ifdef unix
1531                 sched_yield();
1532 #else
1533                 SwitchToThread();
1534 #endif
1535                 continue;
1536         }
1537         
1538         //  find key on page at this level
1539         //  and descend to requested level
1540
1541         if( slot = bt_findslot (set, key, len) ) {
1542           if( drill == lvl )
1543                 return slot;
1544
1545           if( slot > set->page->cnt )
1546                 return bt->err = BTERR_struct;
1547
1548           while( slotptr(set->page, slot)->dead )
1549                 if( slot++ < set->page->cnt )
1550                         continue;
1551                 else
1552                         return bt->err = BTERR_struct, 0;
1553
1554           page_no = bt_getid(slotptr(set->page, slot)->id);
1555           drill--;
1556           continue;
1557         }
1558
1559         //  or slide right into next page
1560
1561         page_no = bt_getid(set->page->right);
1562
1563   } while( page_no );
1564
1565   // return error on end of right chain
1566
1567   bt->err = BTERR_struct;
1568   return 0;     // return error
1569 }
1570
1571 // drill down fixing fence values for left sibling tree
1572
1573 //  call with set write locked
1574 //      return with set unlocked & unpinned.
1575
1576 BTERR bt_fixfences (BtDb *bt, BtPageSet *set, unsigned char *newfence)
1577 {
1578 unsigned char oldfence[256];
1579 BtPageSet next[1];
1580 int chk;
1581
1582   memcpy (oldfence, set->page->fence, 256);
1583   next->page_no = bt_getid(slotptr(set->page, set->page->cnt)->id);
1584
1585   while( !set->page->kill && set->page->lvl ) {
1586         next->latch = bt_pinlatch (bt, next->page_no);
1587         bt_lockpage (BtLockParent, next->latch);
1588         bt_lockpage (BtLockAccess, next->latch);
1589         bt_lockpage (BtLockWrite, next->latch);
1590         bt_unlockpage (BtLockAccess, next->latch);
1591
1592         if( next->pool = bt_pinpool (bt, next->page_no) )
1593                 next->page = bt_page (bt, next->pool, next->page_no);
1594         else
1595                 return bt->err;
1596
1597         chk = keycmp ((BtKey)next->page->fence, oldfence + 1, *oldfence);
1598
1599         if( chk < 0 ) {
1600           next->page_no = bt_getid (next->page->right);
1601           bt_unlockpage (BtLockWrite, next->latch);
1602           bt_unlockpage (BtLockParent, next->latch);
1603           bt_unpinlatch (next->latch);
1604           bt_unpinpool (next->pool);
1605           continue;
1606         }
1607
1608         if( chk > 0 )
1609                 return bt->err = BTERR_struct;
1610
1611         if( bt_fixfences (bt, next, newfence) )
1612                 return bt->err;
1613
1614         break;
1615   }
1616
1617   memcpy (set->page->fence, newfence, 256);
1618
1619   bt_unlockpage (BtLockWrite, set->latch);
1620   bt_unlockpage (BtLockParent, set->latch);
1621   bt_unpinlatch (set->latch);
1622   bt_unpinpool (set->pool);
1623   return 0;
1624 }
1625
1626 //      return page to free list
1627 //      page must be delete & write locked
1628
1629 void bt_freepage (BtDb *bt, BtPageSet *set)
1630 {
1631         //      lock allocation page
1632
1633         bt_spinwritelock (bt->mgr->latchmgr->lock, 0);
1634
1635         //      store chain in second right
1636         bt_putid(set->page->right, bt_getid(bt->mgr->latchmgr->alloc[1].right));
1637         bt_putid(bt->mgr->latchmgr->alloc[1].right, set->page_no);
1638         set->page->free = 1;
1639
1640         // unlock released page
1641
1642         bt_unlockpage (BtLockDelete, set->latch);
1643         bt_unlockpage (BtLockWrite, set->latch);
1644         bt_unpinlatch (set->latch);
1645         bt_unpinpool (set->pool);
1646
1647         // unlock allocation page
1648
1649         bt_spinreleasewrite (bt->mgr->latchmgr->lock, 0);
1650 }
1651
1652 //      remove the root level by promoting its only child
1653 //      call with parent and child pages
1654
1655 BTERR bt_removeroot (BtDb *bt, BtPageSet *root, BtPageSet *child)
1656 {
1657 uid next = 0;
1658
1659   do {
1660         if( next ) {
1661           child->latch = bt_pinlatch (bt, next);
1662           bt_lockpage (BtLockDelete, child->latch);
1663           bt_lockpage (BtLockWrite, child->latch);
1664
1665           if( child->pool = bt_pinpool (bt, next) )
1666                 child->page = bt_page (bt, child->pool, next);
1667           else
1668                 return bt->err;
1669
1670           child->page_no = next;
1671         }
1672
1673         memcpy (root->page, child->page, bt->mgr->page_size);
1674         next = bt_getid (slotptr(child->page, child->page->cnt)->id);
1675         bt_freepage (bt, child);
1676   } while( root->page->lvl > 1 && root->page->cnt == 1 );
1677
1678   bt_unlockpage (BtLockWrite, root->latch);
1679   bt_unpinlatch (root->latch);
1680   bt_unpinpool (root->pool);
1681   return 0;
1682 }
1683
1684 //  pull right page over ourselves in simple merge
1685
1686 BTERR bt_mergeright (BtDb *bt, BtPageSet *set, BtPageSet *parent, BtPageSet *right, uint slot, uint idx)
1687 {
1688         //  install ourselves as child page
1689         //      and delete ourselves from parent
1690
1691         bt_putid (slotptr(parent->page, idx)->id, set->page_no);
1692         slotptr(parent->page, slot)->dead = 1;
1693         parent->page->act--;
1694
1695         //      collapse any empty slots
1696
1697         while( idx = parent->page->cnt - 1 )
1698           if( slotptr(parent->page, idx)->dead ) {
1699                 *slotptr(parent->page, idx) = *slotptr(parent->page, idx + 1);
1700                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1701           } else
1702                   break;
1703
1704         memcpy (set->page, right->page, bt->mgr->page_size);
1705         bt_unlockpage (BtLockParent, right->latch);
1706
1707         bt_freepage (bt, right);
1708
1709         //      do we need to remove a btree level?
1710         //      (leave the first page of leaves alone)
1711
1712         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
1713           if( set->page->lvl )
1714                 return bt_removeroot (bt, parent, set);
1715
1716         bt_unlockpage (BtLockWrite, parent->latch);
1717         bt_unlockpage (BtLockDelete, set->latch);
1718         bt_unlockpage (BtLockWrite, set->latch);
1719         bt_unpinlatch (parent->latch);
1720         bt_unpinpool (parent->pool);
1721         bt_unpinlatch (set->latch);
1722         bt_unpinpool (set->pool);
1723         return 0;
1724 }
1725
1726 //      remove both child and parent from the btree
1727 //      from the fence position in the parent
1728 //      call with both pages locked for writing
1729
1730 BTERR bt_removeparent (BtDb *bt, BtPageSet *child, BtPageSet *parent, BtPageSet *right, BtPageSet *rparent, uint lvl)
1731 {
1732 unsigned char pagefence[256];
1733 uint idx;
1734
1735         //  pull right sibling over ourselves and unlock
1736
1737         memcpy (child->page, right->page, bt->mgr->page_size);
1738
1739         bt_unlockpage (BtLockWrite, child->latch);
1740         bt_unpinlatch (child->latch);
1741         bt_unpinpool (child->pool);
1742
1743         //  install ourselves into right link of old right page
1744
1745         bt_putid (right->page->right, child->page_no);
1746         right->page->goright = 1;       // tell bt_loadpage to go right to us
1747         right->page->kill = 1;
1748
1749         bt_unlockpage (BtLockWrite, right->latch);
1750
1751         //      remove our slot from our parent
1752         //      signal to move right
1753
1754         parent->page->goright = 1;      // tell bt_loadpage to go right to rparent
1755         parent->page->kill = 1;
1756         parent->page->act--;
1757
1758         //      redirect right page pointer in right parent to us
1759
1760         for( idx = 0; idx++ < rparent->page->cnt; )
1761           if( !slotptr(rparent->page, idx)->dead )
1762                 break;
1763
1764         if( bt_getid (slotptr(rparent->page, idx)->id) != right->page_no )
1765                 return bt->err = BTERR_struct;
1766
1767         bt_putid (slotptr(rparent->page, idx)->id, child->page_no);
1768         bt_unlockpage (BtLockWrite, rparent->latch);
1769         bt_unpinlatch (rparent->latch);
1770         bt_unpinpool (rparent->pool);
1771
1772         //      free the right page
1773
1774         bt_lockpage (BtLockDelete, right->latch);
1775         bt_lockpage (BtLockWrite, right->latch);
1776         bt_freepage (bt, right);
1777
1778         //      save parent page fence value
1779
1780         memcpy (pagefence, parent->page->fence, 256);
1781         bt_unlockpage (BtLockWrite, parent->latch);
1782
1783         return bt_removepage (bt, parent, lvl, pagefence);
1784 }
1785
1786 //      remove page from btree
1787 //      call with page unlocked
1788 //      returns with page on free list
1789
1790 BTERR bt_removepage (BtDb *bt, BtPageSet *set, uint lvl, unsigned char *pagefence)
1791 {
1792 BtPageSet parent[1], sibling[1], rparent[1];
1793 unsigned char newfence[256];
1794 uint slot, idx;
1795 BtKey ptr;
1796
1797         //      load and lock our parent
1798
1799   while( 1 ) {
1800         if( !(slot = bt_loadpage (bt, parent, pagefence+1, *pagefence, lvl+1, BtLockWrite)) )
1801                 return bt->err;
1802
1803         //      do we show up in our parent yet?
1804
1805         if( set->page_no != bt_getid (slotptr (parent->page, slot)->id) ) {
1806                 bt_unlockpage (BtLockWrite, parent->latch);
1807                 bt_unpinlatch (parent->latch);
1808                 bt_unpinpool (parent->pool);
1809 #ifdef linux
1810                 sched_yield();
1811 #else
1812                 SwitchToThread();
1813 #endif
1814                 continue;
1815         }
1816
1817         //      can we do a simple merge entirely
1818         //      between siblings on the parent page?
1819
1820         if( slot < parent->page->cnt ) {
1821                 // find our right neighbor
1822                 //      right must exist because the stopper prevents
1823                 //      the rightmost page from deleting
1824
1825                 for( idx = slot; idx++ < parent->page->cnt; )
1826                   if( !slotptr(parent->page, idx)->dead )
1827                         break;
1828
1829                 sibling->page_no = bt_getid (slotptr (parent->page, idx)->id);
1830
1831                 bt_lockpage (BtLockDelete, set->latch);
1832                 bt_lockpage (BtLockWrite, set->latch);
1833
1834                 //      merge right if sibling shows up in
1835                 //  our parent and is not being killed
1836
1837                 if( sibling->page_no == bt_getid (set->page->right) ) {
1838                   sibling->latch = bt_pinlatch (bt, sibling->page_no);
1839                   bt_lockpage (BtLockParent, sibling->latch);
1840                   bt_lockpage (BtLockDelete, sibling->latch);
1841                   bt_lockpage (BtLockWrite, sibling->latch);
1842
1843                   if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
1844                         sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
1845                   else
1846                         return bt->err;
1847
1848                   if( !sibling->page->kill )
1849                         return bt_mergeright(bt, set, parent, sibling, slot, idx);
1850
1851                   //  try again later
1852
1853                   bt_unlockpage (BtLockWrite, sibling->latch);
1854                   bt_unlockpage (BtLockParent, sibling->latch);
1855                   bt_unlockpage (BtLockDelete, sibling->latch);
1856                   bt_unpinlatch (sibling->latch);
1857                   bt_unpinpool (sibling->pool);
1858                 }
1859
1860                 bt_unlockpage (BtLockDelete, set->latch);
1861                 bt_unlockpage (BtLockWrite, set->latch);
1862                 bt_unlockpage (BtLockWrite, parent->latch);
1863                 bt_unpinlatch (parent->latch);
1864                 bt_unpinpool (parent->pool);
1865 #ifdef linux
1866                 sched_yield();
1867 #else
1868                 SwitchToThread();
1869 #endif
1870                 continue;
1871         }
1872
1873         //  find our left neighbor in our parent page
1874
1875         for( idx = slot; --idx; )
1876           if( !slotptr(parent->page, idx)->dead )
1877                 break;
1878
1879         //      if no left neighbor, delete ourselves and our parent
1880
1881         if( !idx ) {
1882                 bt_lockpage (BtLockAccess, set->latch);
1883                 bt_lockpage (BtLockWrite, set->latch);
1884                 bt_unlockpage (BtLockAccess, set->latch);
1885
1886                 rparent->page_no = bt_getid (parent->page->right);
1887                 rparent->latch = bt_pinlatch (bt, rparent->page_no);
1888
1889                 bt_lockpage (BtLockAccess, rparent->latch);
1890                 bt_lockpage (BtLockWrite, rparent->latch);
1891                 bt_unlockpage (BtLockAccess, rparent->latch);
1892
1893                 if( rparent->pool = bt_pinpool (bt, rparent->page_no) )
1894                         rparent->page = bt_page (bt, rparent->pool, rparent->page_no);
1895                 else
1896                         return bt->err;
1897
1898                 if( !rparent->page->kill ) {
1899                   sibling->page_no = bt_getid (set->page->right);
1900                   sibling->latch = bt_pinlatch (bt, sibling->page_no);
1901
1902                   bt_lockpage (BtLockAccess, sibling->latch);
1903                   bt_lockpage (BtLockWrite, sibling->latch);
1904                   bt_unlockpage (BtLockAccess, sibling->latch);
1905
1906                   if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
1907                         sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
1908                   else
1909                         return bt->err;
1910
1911                   if( !sibling->page->kill )
1912                         return bt_removeparent (bt, set, parent, sibling, rparent, lvl+1);
1913
1914                   //  try again later
1915
1916                   bt_unlockpage (BtLockWrite, sibling->latch);
1917                   bt_unpinlatch (sibling->latch);
1918                   bt_unpinpool (sibling->pool);
1919                 }
1920
1921                 bt_unlockpage (BtLockWrite, set->latch);
1922                 bt_unlockpage (BtLockWrite, rparent->latch);
1923                 bt_unpinlatch (rparent->latch);
1924                 bt_unpinpool (rparent->pool);
1925
1926                 bt_unlockpage (BtLockWrite, parent->latch);
1927                 bt_unpinlatch (parent->latch);
1928                 bt_unpinpool (parent->pool);
1929 #ifdef linux
1930                 sched_yield();
1931 #else
1932                 SwitchToThread();
1933 #endif
1934                 continue;
1935         }
1936
1937         // redirect parent to our left sibling
1938         // lock and map our left sibling's page
1939
1940         sibling->page_no = bt_getid (slotptr(parent->page, idx)->id);
1941         sibling->latch = bt_pinlatch (bt, sibling->page_no);
1942
1943         //      wait our turn on fence key maintenance
1944
1945         bt_lockpage(BtLockParent, sibling->latch);
1946         bt_lockpage(BtLockAccess, sibling->latch);
1947         bt_lockpage(BtLockWrite, sibling->latch);
1948         bt_unlockpage(BtLockAccess, sibling->latch);
1949
1950         if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
1951                 sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
1952         else
1953                 return bt->err;
1954
1955         //  wait until left sibling is in our parent
1956
1957         if( bt_getid (sibling->page->right) != set->page_no ) {
1958                 bt_unlockpage (BtLockWrite, parent->latch);
1959                 bt_unlockpage (BtLockWrite, sibling->latch);
1960                 bt_unlockpage (BtLockParent, sibling->latch);
1961                 bt_unpinlatch (parent->latch);
1962                 bt_unpinpool (parent->pool);
1963                 bt_unpinlatch (sibling->latch);
1964                 bt_unpinpool (sibling->pool);
1965 #ifdef linux
1966                 sched_yield();
1967 #else
1968                 SwitchToThread();
1969 #endif
1970                 continue;
1971         }
1972
1973         //      delete our left sibling from parent
1974
1975         slotptr(parent->page,idx)->dead = 1;
1976         parent->page->dirty = 1;
1977         parent->page->act--;
1978
1979         //      redirect our parent slot to our left sibling
1980
1981         bt_putid (slotptr(parent->page, slot)->id, sibling->page_no);
1982         memcpy (sibling->page->right, set->page->right, BtId);
1983
1984         //      collapse dead slots from parent
1985
1986         while( idx = parent->page->cnt - 1 )
1987           if( slotptr(parent->page, idx)->dead ) {
1988                 *slotptr(parent->page, idx) = *slotptr(parent->page, parent->page->cnt);
1989                 memset (slotptr(parent->page, parent->page->cnt--), 0, sizeof(BtSlot));
1990           } else
1991                   break;
1992
1993         //  free our original page
1994
1995         bt_lockpage (BtLockDelete, set->latch);
1996         bt_lockpage (BtLockWrite, set->latch);
1997         bt_freepage (bt, set);
1998
1999         //      go down the left node's fence keys to the leaf level
2000         //      and update the fence keys in each page
2001
2002         memcpy (newfence, parent->page->fence, 256);
2003
2004         if( bt_fixfences (bt, sibling, newfence) )
2005                 return bt->err;
2006
2007         //  promote sibling as new root?
2008
2009         if( parent->page_no == ROOT_page && parent->page->cnt == 1 )
2010          if( sibling->page->lvl ) {
2011           sibling->latch = bt_pinlatch (bt, sibling->page_no);
2012           bt_lockpage (BtLockDelete, sibling->latch);
2013           bt_lockpage (BtLockWrite, sibling->latch);
2014
2015           if( sibling->pool = bt_pinpool (bt, sibling->page_no) )
2016                 sibling->page = bt_page (bt, sibling->pool, sibling->page_no);
2017           else
2018                 return bt->err;
2019
2020           return bt_removeroot (bt, parent, sibling);
2021          }
2022
2023         bt_unlockpage (BtLockWrite, parent->latch);
2024         bt_unpinlatch (parent->latch);
2025         bt_unpinpool (parent->pool);
2026
2027         return 0;
2028   }
2029 }
2030
2031 //  find and delete key on page by marking delete flag bit
2032 //  if page becomes empty, delete it from the btree
2033
2034 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len)
2035 {
2036 unsigned char pagefence[256];
2037 uint slot, idx, found;
2038 BtPageSet set[1];
2039 BtKey ptr;
2040
2041         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockWrite) )
2042                 ptr = keyptr(set->page, slot);
2043         else
2044                 return bt->err;
2045
2046         // if key is found delete it, otherwise ignore request
2047
2048         if( found = slot <= set->page->cnt )
2049           if( found = !keycmp (ptr, key, len) )
2050                 if( found = slotptr(set->page, slot)->dead == 0 ) {
2051                   slotptr(set->page,slot)->dead = 1;
2052                   set->page->dirty = 1;
2053                   set->page->act--;
2054
2055                   // collapse empty slots
2056
2057                   while( idx = set->page->cnt - 1 )
2058                         if( slotptr(set->page, idx)->dead ) {
2059                           *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2060                           memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2061                         } else
2062                                 break;
2063                 }
2064
2065         if( set->page->act ) {
2066                 bt_unlockpage(BtLockWrite, set->latch);
2067                 bt_unpinlatch (set->latch);
2068                 bt_unpinpool (set->pool);
2069                 return bt->found = found, 0;
2070         }
2071
2072         memcpy (pagefence, set->page->fence, 256);
2073         set->page->kill = 1;
2074
2075         bt_unlockpage (BtLockWrite, set->latch);
2076
2077         if( bt_removepage (bt, set, 0, pagefence) )
2078                 return bt->err;
2079
2080         bt->found = found;
2081         return 0;
2082 }
2083
2084 //      find key in leaf level and return row-id
2085
2086 uid bt_findkey (BtDb *bt, unsigned char *key, uint len)
2087 {
2088 BtPageSet set[1];
2089 uint  slot;
2090 uid id = 0;
2091 BtKey ptr;
2092
2093         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2094                 ptr = keyptr(set->page, slot);
2095         else
2096                 return 0;
2097
2098         // if key exists, return row-id
2099         //      otherwise return 0
2100
2101         if( slot <= set->page->cnt )
2102           if( !keycmp (ptr, key, len) )
2103                 id = bt_getid(slotptr(set->page,slot)->id);
2104
2105         bt_unlockpage (BtLockRead, set->latch);
2106         bt_unpinlatch (set->latch);
2107         bt_unpinpool (set->pool);
2108         return id;
2109 }
2110
2111 //      check page for space available,
2112 //      clean if necessary and return
2113 //      0 - page needs splitting
2114 //      >0  new slot value
2115
2116 uint bt_cleanpage(BtDb *bt, BtPage page, uint amt, uint slot)
2117 {
2118 uint nxt = bt->mgr->page_size, off;
2119 uint cnt = 0, idx = 0;
2120 uint max = page->cnt;
2121 uint newslot = max;
2122 BtKey key;
2123
2124         if( page->min >= (max+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2125                 return slot;
2126
2127         //      skip cleanup if nothing to reclaim
2128
2129         if( !page->dirty )
2130                 return 0;
2131
2132         memcpy (bt->frame, page, bt->mgr->page_size);
2133
2134         // skip page info and set rest of page to zero
2135
2136         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2137         page->dirty = 0;
2138         page->act = 0;
2139
2140         // try cleaning up page first
2141         // by removing deleted keys
2142
2143         while( cnt++ < max ) {
2144                 if( cnt == slot )
2145                         newslot = idx + 1;
2146                 if( slotptr(bt->frame,cnt)->dead )
2147                         continue;
2148
2149                 // if its not the fence key,
2150                 // copy the key across
2151
2152                 off = slotptr(bt->frame,cnt)->off;
2153
2154                 if( off >= sizeof(*page) ) {
2155                         key = keyptr(bt->frame, cnt);
2156                         off = nxt -= key->len + 1;
2157                         memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2158                 }
2159
2160                 // copy slot
2161
2162                 memcpy(slotptr(page, ++idx)->id, slotptr(bt->frame, cnt)->id, BtId);
2163                 slotptr(page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2164                 slotptr(page, idx)->off = off;
2165                 page->act++;
2166         }
2167
2168         page->min = nxt;
2169         page->cnt = idx;
2170
2171         //      see if page has enough space now, or does it need splitting?
2172
2173         if( page->min >= (idx+1) * sizeof(BtSlot) + sizeof(*page) + amt + 1 )
2174                 return newslot;
2175
2176         return 0;
2177 }
2178
2179 // split the root and raise the height of the btree
2180
2181 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, uid page_no2)
2182 {
2183 uint nxt = bt->mgr->page_size;
2184 unsigned char leftkey[256];
2185 uid new_page;
2186
2187         //  Obtain an empty page to use, and copy the current
2188         //  root contents into it, e.g. lower keys
2189
2190         memcpy (leftkey, root->page->fence, 256);
2191         root->page->posted = 1;
2192
2193         if( !(new_page = bt_newpage(bt, root->page)) )
2194                 return bt->err;
2195
2196         // preserve the page info at the bottom
2197         // of higher keys and set rest to zero
2198
2199         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2200         memset(root->page->fence, 0, 256);
2201         root->page->fence[0] = 2;
2202         root->page->fence[1] = 0xff;
2203         root->page->fence[2] = 0xff;
2204
2205         // insert lower keys page fence key on newroot page
2206
2207         nxt -= *leftkey + 1;
2208         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2209         bt_putid(slotptr(root->page, 1)->id, new_page);
2210         slotptr(root->page, 1)->off = nxt;
2211         
2212         // insert stopper key on newroot page
2213         // and increase the root height
2214
2215         bt_putid(slotptr(root->page, 2)->id, page_no2);
2216         slotptr(root->page, 2)->off = offsetof(struct BtPage_, fence);
2217
2218         bt_putid(root->page->right, 0);
2219         root->page->min = nxt;          // reset lowest used offset and key count
2220         root->page->cnt = 2;
2221         root->page->act = 2;
2222         root->page->lvl++;
2223
2224         // release and unpin root
2225
2226         bt_unlockpage(BtLockWrite, root->latch);
2227         bt_unpinlatch (root->latch);
2228         bt_unpinpool (root->pool);
2229         return 0;
2230 }
2231
2232 //  split already locked full node
2233 //      return unlocked.
2234
2235 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2236 {
2237 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size, off;
2238 unsigned char fencekey[256];
2239 uint lvl = set->page->lvl;
2240 uid right;
2241 BtKey key;
2242
2243         //  split higher half of keys to bt->frame
2244
2245         memset (bt->frame, 0, bt->mgr->page_size);
2246         max = set->page->cnt;
2247         cnt = max / 2;
2248         idx = 0;
2249
2250         while( cnt++ < max ) {
2251                 if( !lvl || cnt < max ) {
2252                         key = keyptr(set->page, cnt);
2253                         off = nxt -= key->len + 1;
2254                         memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
2255                 } else
2256                         off = offsetof(struct BtPage_, fence);
2257
2258                 memcpy(slotptr(bt->frame,++idx)->id, slotptr(set->page,cnt)->id, BtId);
2259                 slotptr(bt->frame, idx)->tod = slotptr(set->page, cnt)->tod;
2260                 slotptr(bt->frame, idx)->off = off;
2261                 bt->frame->act++;
2262         }
2263
2264         if( set->page_no == ROOT_page )
2265                 bt->frame->posted = 1;
2266
2267         memcpy (bt->frame->fence, set->page->fence, 256);
2268         bt->frame->bits = bt->mgr->page_bits;
2269         bt->frame->min = nxt;
2270         bt->frame->cnt = idx;
2271         bt->frame->lvl = lvl;
2272
2273         // link right node
2274
2275         if( set->page_no > ROOT_page )
2276                 memcpy (bt->frame->right, set->page->right, BtId);
2277
2278         //      get new free page and write higher keys to it.
2279
2280         if( !(right = bt_newpage(bt, bt->frame)) )
2281                 return bt->err;
2282
2283         //      update lower keys to continue in old page
2284
2285         memcpy (bt->frame, set->page, bt->mgr->page_size);
2286         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2287         nxt = bt->mgr->page_size;
2288         set->page->posted = 0;
2289         set->page->dirty = 0;
2290         set->page->act = 0;
2291         cnt = 0;
2292         idx = 0;
2293
2294         //  assemble page of smaller keys
2295
2296         while( cnt++ < max / 2 ) {
2297                 key = keyptr(bt->frame, cnt);
2298
2299                 if( !lvl || cnt < max / 2 ) {
2300                         off = nxt -= key->len + 1;
2301                         memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2302                 } else 
2303                         off = offsetof(struct BtPage_, fence);
2304
2305                 memcpy(slotptr(set->page,++idx)->id, slotptr(bt->frame,cnt)->id, BtId);
2306                 slotptr(set->page, idx)->tod = slotptr(bt->frame, cnt)->tod;
2307                 slotptr(set->page, idx)->off = off;
2308                 set->page->act++;
2309         }
2310
2311         // install fence key for smaller key page
2312
2313         memset(set->page->fence, 0, 256);
2314         memcpy(set->page->fence, key, key->len + 1);
2315
2316         bt_putid(set->page->right, right);
2317         set->page->min = nxt;
2318         set->page->cnt = idx;
2319
2320         // if current page is the root page, split it
2321
2322         if( set->page_no == ROOT_page )
2323                 return bt_splitroot (bt, set, right);
2324
2325         bt_unlockpage (BtLockWrite, set->latch);
2326
2327         // insert new fences in their parent pages
2328
2329         while( 1 ) {
2330                 bt_lockpage (BtLockParent, set->latch);
2331                 bt_lockpage (BtLockWrite, set->latch);
2332
2333                 memcpy (fencekey, set->page->fence, 256);
2334                 right = bt_getid (set->page->right);
2335
2336                 if( set->page->posted ) {
2337                         bt_unlockpage (BtLockParent, set->latch);
2338                         bt_unlockpage (BtLockWrite, set->latch);
2339                         bt_unpinlatch (set->latch);
2340                         bt_unpinpool (set->pool);
2341                         return 0;
2342                 }
2343
2344                 set->page->posted = 1;
2345                 bt_unlockpage (BtLockWrite, set->latch);
2346
2347                 if( bt_insertkey (bt, fencekey+1, *fencekey, set->page_no, time(NULL), lvl+1) )
2348                         return bt->err;
2349
2350                 bt_unlockpage (BtLockParent, set->latch);
2351                 bt_unpinlatch (set->latch);
2352                 bt_unpinpool (set->pool);
2353
2354                 if( !(set->page_no = right) )
2355                         break;
2356
2357                 set->latch = bt_pinlatch (bt, right);
2358
2359                 if( set->pool = bt_pinpool (bt, right) )
2360                         set->page = bt_page (bt, set->pool, right);
2361                 else
2362                         return bt->err;
2363         }
2364
2365         return 0;
2366 }
2367
2368 //  Insert new key into the btree at given level.
2369
2370 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uid id, uint tod, uint lvl)
2371 {
2372 BtPageSet set[1];
2373 uint slot, idx;
2374 BtKey ptr;
2375
2376         while( 1 ) {
2377                 if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
2378                         ptr = keyptr(set->page, slot);
2379                 else
2380                 {
2381                         if ( !bt->err )
2382                                 bt->err = BTERR_ovflw;
2383                         return bt->err;
2384                 }
2385
2386                 // if key already exists, update id and return
2387
2388                 if( slot <= set->page->cnt )
2389                   if( !keycmp (ptr, key, len) ) {
2390                         if( slotptr(set->page, slot)->dead )
2391                                 set->page->act++;
2392                         slotptr(set->page, slot)->dead = 0;
2393                         slotptr(set->page, slot)->tod = tod;
2394                         bt_putid(slotptr(set->page,slot)->id, id);
2395                         bt_unlockpage(BtLockWrite, set->latch);
2396                         bt_unpinlatch (set->latch);
2397                         bt_unpinpool (set->pool);
2398                         return 0;
2399                 }
2400
2401                 // check if page has enough space
2402
2403                 if( slot = bt_cleanpage (bt, set->page, len, slot) )
2404                         break;
2405
2406                 if( bt_splitpage (bt, set) )
2407                         return bt->err;
2408         }
2409
2410         // calculate next available slot and copy key into page
2411
2412         set->page->min -= len + 1; // reset lowest used offset
2413         ((unsigned char *)set->page)[set->page->min] = len;
2414         memcpy ((unsigned char *)set->page + set->page->min +1, key, len );
2415
2416         for( idx = slot; idx <= set->page->cnt; idx++ )
2417           if( slotptr(set->page, idx)->dead )
2418                 break;
2419
2420         // now insert key into array before slot
2421
2422         if( idx > set->page->cnt )
2423                 set->page->cnt++;
2424
2425         set->page->act++;
2426
2427         while( idx > slot )
2428                 *slotptr(set->page, idx) = *slotptr(set->page, idx -1), idx--;
2429
2430         bt_putid(slotptr(set->page,slot)->id, id);
2431         slotptr(set->page, slot)->off = set->page->min;
2432         slotptr(set->page, slot)->tod = tod;
2433         slotptr(set->page, slot)->dead = 0;
2434
2435         bt_unlockpage (BtLockWrite, set->latch);
2436         bt_unpinlatch (set->latch);
2437         bt_unpinpool (set->pool);
2438         return 0;
2439 }
2440
2441 //  cache page of keys into cursor and return starting slot for given key
2442
2443 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2444 {
2445 BtPageSet set[1];
2446 uint slot;
2447
2448         // cache page for retrieval
2449
2450         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2451           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2452         else
2453           return 0;
2454
2455         bt->cursor_page = set->page_no;
2456
2457         bt_unlockpage(BtLockRead, set->latch);
2458         bt_unpinlatch (set->latch);
2459         bt_unpinpool (set->pool);
2460         return slot;
2461 }
2462
2463 //  return next slot for cursor page
2464 //  or slide cursor right into next page
2465
2466 uint bt_nextkey (BtDb *bt, uint slot)
2467 {
2468 BtPageSet set[1];
2469 uid right;
2470
2471   do {
2472         right = bt_getid(bt->cursor->right);
2473         while( slot++ < bt->cursor->cnt )
2474           if( slotptr(bt->cursor,slot)->dead )
2475                 continue;
2476           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2477                 return slot;
2478           else
2479                 break;
2480
2481         if( !right )
2482                 break;
2483
2484         bt->cursor_page = right;
2485
2486         if( set->pool = bt_pinpool (bt, right) )
2487                 set->page = bt_page (bt, set->pool, right);
2488         else
2489                 return 0;
2490
2491         set->latch = bt_pinlatch (bt, right);
2492     bt_lockpage(BtLockRead, set->latch);
2493
2494         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2495
2496         bt_unlockpage(BtLockRead, set->latch);
2497         bt_unpinlatch (set->latch);
2498         bt_unpinpool (set->pool);
2499         slot = 0;
2500   } while( 1 );
2501
2502   return bt->err = 0;
2503 }
2504
2505 BtKey bt_key(BtDb *bt, uint slot)
2506 {
2507         return keyptr(bt->cursor, slot);
2508 }
2509
2510 uid bt_uid(BtDb *bt, uint slot)
2511 {
2512         return bt_getid(slotptr(bt->cursor,slot)->id);
2513 }
2514
2515 uint bt_tod(BtDb *bt, uint slot)
2516 {
2517         return slotptr(bt->cursor,slot)->tod;
2518 }
2519
2520
2521 #ifdef STANDALONE
2522
2523 void bt_latchaudit (BtDb *bt)
2524 {
2525 ushort idx, hashidx;
2526 BtLatchSet *latch;
2527 BtPool *pool;
2528 BtPage page;
2529 uid page_no;
2530
2531 #ifdef unix
2532         for( idx = 1; idx < bt->mgr->latchmgr->latchdeployed; idx++ ) {
2533                 latch = bt->mgr->latchsets + idx;
2534                 if( *(uint *)latch->readwr ) {
2535                         fprintf(stderr, "latchset %d r/w locked for page %.8x\n", idx, latch->page_no);
2536                         *(uint *)latch->readwr = 0;
2537                 }
2538                 if( *(uint *)latch->access ) {
2539                         fprintf(stderr, "latchset %d access locked for page %.8x\n", idx, latch->page_no);
2540                         *(uint *)latch->access = 0;
2541                 }
2542                 if( *(uint *)latch->parent ) {
2543                         fprintf(stderr, "latchset %d parent locked for page %.8x\n", idx, latch->page_no);
2544                         *(uint *)latch->parent = 0;
2545                 }
2546                 if( *(uint *)latch->busy ) {
2547                         fprintf(stderr, "latchset %d busy locked for page %.8x\n", idx, latch->page_no);
2548                         *(uint *)latch->parent = 0;
2549                 }
2550                 if( latch->pin ) {
2551                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2552                         latch->pin = 0;
2553                 }
2554         }
2555
2556         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2557           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2558                 latch = bt->mgr->latchsets + idx;
2559                 if( latch->hash != hashidx ) {
2560                         fprintf(stderr, "latchset %d wrong hashidx\n", idx);
2561                         latch->hash = hashidx;
2562                 }
2563           } while( idx = latch->next );
2564         }
2565
2566         page_no = bt_getid(bt->mgr->latchmgr->alloc[1].right);
2567
2568         while( page_no ) {
2569                 fprintf(stderr, "free: %.6x\n", (uint)page_no);
2570
2571                 if( pool = bt_pinpool (bt, page_no) )
2572                         page = bt_page (bt, pool, page_no);
2573                 else
2574                         return;
2575
2576             page_no = bt_getid(page->right);
2577                 bt_unpinpool (pool);
2578         }
2579 #endif
2580 }
2581
2582 typedef struct {
2583         char type, idx;
2584         char *infile;
2585         BtMgr *mgr;
2586         int num;
2587 } ThreadArg;
2588
2589 //  standalone program to index file of keys
2590 //  then list them onto std-out
2591
2592 #ifdef unix
2593 void *index_file (void *arg)
2594 #else
2595 uint __stdcall index_file (void *arg)
2596 #endif
2597 {
2598 int line = 0, found = 0, cnt = 0;
2599 uid next, page_no = LEAF_page;  // start on first page of leaves
2600 unsigned char key[256];
2601 ThreadArg *args = arg;
2602 int ch, len = 0, slot;
2603 BtPageSet set[1];
2604 time_t tod[1];
2605 BtKey ptr;
2606 BtDb *bt;
2607 FILE *in;
2608
2609         bt = bt_open (args->mgr);
2610         time (tod);
2611
2612         switch(args->type | 0x20)
2613         {
2614         case 'a':
2615                 fprintf(stderr, "started latch mgr audit\n");
2616                 bt_latchaudit (bt);
2617                 fprintf(stderr, "finished latch mgr audit\n");
2618                 break;
2619
2620         case 'w':
2621                 fprintf(stderr, "started indexing for %s\n", args->infile);
2622                 if( in = fopen (args->infile, "rb") )
2623                   while( ch = getc(in), ch != EOF )
2624                         if( ch == '\n' )
2625                         {
2626                           line++;
2627
2628                           if( args->num == 1 )
2629                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2630
2631                           else if( args->num )
2632                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2633
2634                           if( bt_insertkey (bt, key, len, line, *tod, 0) )
2635                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2636                           len = 0;
2637                         }
2638                         else if( len < 255 )
2639                                 key[len++] = ch;
2640                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2641                 break;
2642
2643         case 'd':
2644                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2645                 if( in = fopen (args->infile, "rb") )
2646                   while( ch = getc(in), ch != EOF )
2647                         if( ch == '\n' )
2648                         {
2649                           line++;
2650                           if( args->num == 1 )
2651                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2652
2653                           else if( args->num )
2654                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2655
2656                           if( bt_deletekey (bt, key, len) )
2657                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2658                           len = 0;
2659                         }
2660                         else if( len < 255 )
2661                                 key[len++] = ch;
2662                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2663                 break;
2664
2665         case 'f':
2666                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2667                 if( in = fopen (args->infile, "rb") )
2668                   while( ch = getc(in), ch != EOF )
2669                         if( ch == '\n' )
2670                         {
2671                           line++;
2672                           if( args->num == 1 )
2673                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2674
2675                           else if( args->num )
2676                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2677
2678                           if( bt_findkey (bt, key, len) )
2679                                 found++;
2680                           else if( bt->err )
2681                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2682                           len = 0;
2683                         }
2684                         else if( len < 255 )
2685                                 key[len++] = ch;
2686                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2687                 break;
2688
2689         case 's':
2690                 fprintf(stderr, "started scanning\n");
2691                 do {
2692                         if( set->pool = bt_pinpool (bt, page_no) )
2693                                 set->page = bt_page (bt, set->pool, page_no);
2694                         else
2695                                 break;
2696                         set->latch = bt_pinlatch (bt, page_no);
2697                         bt_lockpage (BtLockRead, set->latch);
2698                         next = bt_getid (set->page->right);
2699                         cnt += set->page->act;
2700
2701                         for( slot = 0; slot++ < set->page->cnt; )
2702                          if( next || slot < set->page->cnt )
2703                           if( !slotptr(set->page, slot)->dead ) {
2704                                 ptr = keyptr(set->page, slot);
2705                                 fwrite (ptr->key, ptr->len, 1, stdout);
2706                                 fputc ('\n', stdout);
2707                           }
2708
2709                         bt_unlockpage (BtLockRead, set->latch);
2710                         bt_unpinlatch (set->latch);
2711                         bt_unpinpool (set->pool);
2712                 } while( page_no = next );
2713
2714                 cnt--;  // remove stopper key
2715                 fprintf(stderr, " Total keys read %d\n", cnt);
2716                 break;
2717
2718         case 'c':
2719                 fprintf(stderr, "started counting\n");
2720                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2721                 page_no = LEAF_page;
2722
2723                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2724                         pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, page_no << bt->mgr->page_bits);
2725                         if( !bt->frame->free && !bt->frame->lvl )
2726                                 cnt += bt->frame->act;
2727                         if( page_no > LEAF_page )
2728                                 next = page_no + 1;
2729                         page_no = next;
2730                 }
2731                 
2732                 cnt--;  // remove stopper key
2733                 fprintf(stderr, " Total keys read %d\n", cnt);
2734                 break;
2735         }
2736
2737         bt_close (bt);
2738 #ifdef unix
2739         return NULL;
2740 #else
2741         return 0;
2742 #endif
2743 }
2744
2745 typedef struct timeval timer;
2746
2747 int main (int argc, char **argv)
2748 {
2749 int idx, cnt, len, slot, err;
2750 int segsize, bits = 16;
2751 #ifdef unix
2752 pthread_t *threads;
2753 timer start, stop;
2754 #else
2755 time_t start[1], stop[1];
2756 HANDLE *threads;
2757 #endif
2758 double real_time;
2759 ThreadArg *args;
2760 uint poolsize = 0;
2761 int num = 0;
2762 char key[1];
2763 BtMgr *mgr;
2764 BtKey ptr;
2765 BtDb *bt;
2766
2767         if( argc < 3 ) {
2768                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2769                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2770                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2771                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2772                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2773                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2774                 exit(0);
2775         }
2776
2777 #ifdef unix
2778         gettimeofday(&start, NULL);
2779 #else
2780         time(start);
2781 #endif
2782
2783         if( argc > 3 )
2784                 bits = atoi(argv[3]);
2785
2786         if( argc > 4 )
2787                 poolsize = atoi(argv[4]);
2788
2789         if( !poolsize )
2790                 fprintf (stderr, "Warning: no mapped_pool\n");
2791
2792         if( poolsize > 65535 )
2793                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2794
2795         if( argc > 5 )
2796                 segsize = atoi(argv[5]);
2797         else
2798                 segsize = 4;    // 16 pages per mmap segment
2799
2800         if( argc > 6 )
2801                 num = atoi(argv[6]);
2802
2803         cnt = argc - 7;
2804 #ifdef unix
2805         threads = malloc (cnt * sizeof(pthread_t));
2806 #else
2807         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2808 #endif
2809         args = malloc (cnt * sizeof(ThreadArg));
2810
2811         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2812
2813         if( !mgr ) {
2814                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2815                 exit (1);
2816         }
2817
2818         //      fire off threads
2819
2820         for( idx = 0; idx < cnt; idx++ ) {
2821                 args[idx].infile = argv[idx + 7];
2822                 args[idx].type = argv[2][0];
2823                 args[idx].mgr = mgr;
2824                 args[idx].num = num;
2825                 args[idx].idx = idx;
2826 #ifdef unix
2827                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2828                         fprintf(stderr, "Error creating thread %d\n", err);
2829 #else
2830                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2831 #endif
2832         }
2833
2834         //      wait for termination
2835
2836 #ifdef unix
2837         for( idx = 0; idx < cnt; idx++ )
2838                 pthread_join (threads[idx], NULL);
2839         gettimeofday(&stop, NULL);
2840         real_time = 1000.0 * ( stop.tv_sec - start.tv_sec ) + 0.001 * (stop.tv_usec - start.tv_usec );
2841 #else
2842         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2843
2844         for( idx = 0; idx < cnt; idx++ )
2845                 CloseHandle(threads[idx]);
2846
2847         time (stop);
2848         real_time = 1000 * (*stop - *start);
2849 #endif
2850         fprintf(stderr, " Time to complete: %.2f seconds\n", real_time/1000);
2851         bt_mgrclose (mgr);
2852 }
2853
2854 #endif  //STANDALONE