]> pd.if.org Git - btree/blob - threadskv2.c
Add multi/thread multi-process threadskv10h.c
[btree] / threadskv2.c
1 // btree version threadskv2 sched_yield version
2 //      with reworked bt_deletekey code
3 //      phase-fair reader writer lock
4 //      generalized key-value interface
5 //
6 //      reworked btree node as red/black binomial tree
7 // 27 AUG 2014
8
9 // author: karl malbrain, malbrain@cal.berkeley.edu
10
11 /*
12 This work, including the source code, documentation
13 and related data, is placed into the public domain.
14
15 The orginal author is Karl Malbrain.
16
17 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
18 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
19 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
20 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
21 RESULTING FROM THE USE, MODIFICATION, OR
22 REDISTRIBUTION OF THIS SOFTWARE.
23 */
24
25 // Please see the project home page for documentation
26 // code.google.com/p/high-concurrency-btree
27
28 #define _FILE_OFFSET_BITS 64
29 #define _LARGEFILE64_SOURCE
30
31 #ifdef linux
32 #define _GNU_SOURCE
33 #endif
34
35 #ifdef unix
36 #include <unistd.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <fcntl.h>
40 #include <sys/mman.h>
41 #include <errno.h>
42 #include <pthread.h>
43 #else
44 #define WIN32_LEAN_AND_MEAN
45 #include <windows.h>
46 #include <stdio.h>
47 #include <stdlib.h>
48 #include <fcntl.h>
49 #include <process.h>
50 #include <intrin.h>
51 #endif
52
53 #include <memory.h>
54 #include <string.h>
55 #include <stddef.h>
56
57 typedef unsigned long long      uid;
58
59 #ifndef unix
60 typedef unsigned long long      off64_t;
61 typedef unsigned short          ushort;
62 typedef unsigned int            uint;
63 #endif
64
65 #define BT_latchtable   128                                     // number of latch manager slots
66
67 #define BT_ro 0x6f72    // ro
68 #define BT_rw 0x7772    // rw
69
70 #define BT_maxbits              24                                      // maximum page size in bits
71 #define BT_minbits              9                                       // minimum page size in bits
72 #define BT_minpage              (1 << BT_minbits)       // minimum page size
73 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
74
75 #define BT_binomial             5                                       // number of levels to emit together
76 /*
77 There are five lock types for each node in three independent sets: 
78 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
79 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
80 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
81 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
82 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
83 */
84
85 typedef enum{
86         BtLockAccess,
87         BtLockDelete,
88         BtLockRead,
89         BtLockWrite,
90         BtLockParent
91 } BtLock;
92
93 //      definition for phase-fair reader/writer lock implementation
94
95 typedef struct {
96         ushort rin[1];
97         ushort rout[1];
98         ushort ticket[1];
99         ushort serving[1];
100 } RWLock;
101
102 #define PHID 0x1
103 #define PRES 0x2
104 #define MASK 0x3
105 #define RINC 0x4
106
107 //      definition for spin latch implementation
108
109 // exclusive is set for write access
110 // share is count of read accessors
111 // grant write lock when share == 0
112
113 volatile typedef struct {
114         ushort exclusive:1;
115         ushort pending:1;
116         ushort share:14;
117 } BtSpinLatch;
118
119 #define XCL 1
120 #define PEND 2
121 #define BOTH 3
122 #define SHARE 4
123
124 //  hash table entries
125
126 typedef struct {
127         BtSpinLatch latch[1];
128         volatile ushort slot;           // Latch table entry at head of chain
129 } BtHashEntry;
130
131 //      latch manager table structure
132
133 typedef struct {
134         RWLock readwr[1];               // read/write page lock
135         RWLock access[1];               // Access Intent/Page delete
136         RWLock parent[1];               // Posting of fence key in parent
137         BtSpinLatch busy[1];            // slot is being moved between chains
138         volatile ushort next;           // next entry in hash table chain
139         volatile ushort prev;           // prev entry in hash table chain
140         volatile ushort pin;            // number of outstanding locks
141         volatile ushort hash;           // hash slot entry is under
142         volatile uid page_no;           // latch set page number
143 } BtLatchSet;
144
145 //      Define the length of the page and key pointers
146
147 #define BtId 6
148
149 //      Page key slot definition.
150
151 //      Keys are marked dead, but remain on the page until
152 //      cleanup is called. The fence key (highest key) for
153 //      a leaf page is always present, even after cleanup.
154
155 typedef struct {
156         uint off:BT_maxbits;    // page offset for key start
157         uint fence:1;                   // is tree node the fence key?
158         uint red:1;                             // is tree node red?
159         uint dead:1;                    // set for deleted key
160         uint left, right;               // next nodes down
161 } BtSlot;
162
163 //      The key structure occupies space at the upper end of
164 //      each page.  It's a length byte followed by the key
165 //      bytes.
166
167 typedef struct {
168         unsigned char len;
169         unsigned char key[1];
170 } *BtKey;
171
172 //      the value structure also occupies space at the upper
173 //      end of the page.
174
175 typedef struct {
176         unsigned char len;
177         unsigned char value[1];
178 } *BtVal;
179
180 //      The first part of an index page.
181 //      It is immediately followed
182 //      by the BtSlot array of keys.
183
184 typedef struct BtPage_ {
185         uint cnt;                                       // count of keys in page
186         uint act;                                       // count of active keys
187         uint min;                                       // next key offset
188         uint root;                                      // slot of root node
189         unsigned char bits:7;           // page size in bits
190         unsigned char free:1;           // page is on free chain
191         unsigned char lvl:6;            // level of page
192         unsigned char kill:1;           // page is being deleted
193         unsigned char dirty:1;          // page has deleted keys
194         unsigned char right[BtId];      // page number to right
195 } *BtPage;
196
197 //      The memory mapping pool table buffer manager entry
198
199 typedef struct {
200         uid  basepage;                          // mapped base page number
201         char *map;                                      // mapped memory pointer
202         ushort slot;                            // slot index in this array
203         ushort pin;                                     // mapped page pin counter
204         void *hashprev;                         // previous pool entry for the same hash idx
205         void *hashnext;                         // next pool entry for the same hash idx
206 #ifndef unix
207         HANDLE hmap;                            // Windows memory mapping handle
208 #endif
209 } BtPool;
210
211 #define CLOCK_bit 0x8000                // bit in pool->pin
212
213 //  The loadpage interface object
214
215 typedef struct {
216         uid page_no;            // current page number
217         BtPage page;            // current page pointer
218         BtPool *pool;           // current page pool
219         BtLatchSet *latch;      // current page latch set
220 } BtPageSet;
221
222 //      structure for latch manager on ALLOC_page
223
224 typedef struct {
225         struct BtPage_ alloc[1];        // next page_no in right ptr
226         unsigned char chain[BtId];      // head of free page_nos chain
227         BtSpinLatch lock[1];            // allocation area lite latch
228         ushort latchdeployed;           // highest number of latch entries deployed
229         ushort nlatchpage;                      // number of latch pages at BT_latch
230         ushort latchtotal;                      // number of page latch entries
231         ushort latchhash;                       // number of latch hash table slots
232         ushort latchvictim;                     // next latch entry to examine
233         BtHashEntry table[0];           // the hash table
234 } BtLatchMgr;
235
236 //      The object structure for Btree access
237
238 typedef struct {
239         uint page_size;                         // page size    
240         uint page_bits;                         // page size in bits    
241         uint seg_bits;                          // seg size in pages in bits
242         uint mode;                                      // read-write mode
243 #ifdef unix
244         int idx;
245 #else
246         HANDLE idx;
247 #endif
248         ushort poolcnt;                         // highest page pool node in use
249         ushort poolmax;                         // highest page pool node allocated
250         ushort poolmask;                        // total number of pages in mmap segment - 1
251         ushort hashsize;                        // size of Hash Table for pool entries
252         volatile uint evicted;          // last evicted hash table slot
253         ushort *hash;                           // pool index for hash entries
254         BtSpinLatch *latch;                     // latches for hash table slots
255         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
256         BtLatchSet *latchsets;          // mapped latch set from latch pages
257         BtPool *pool;                           // memory pool page segments
258 #ifndef unix
259         HANDLE halloc;                          // allocation and latch table handle
260 #endif
261 } BtMgr;
262
263 //      red-black tree descent stack
264
265 typedef struct {
266         uint slot:BT_maxbits;
267         int cmp:2;                      // comparison result
268 } BtPathEntry;
269
270 typedef struct {
271         int lvl;                        // height of the stack
272         int ge;                         // last node that is >= given node
273         BtPathEntry entry[BT_maxbits+2];        // stacked tree descent
274 } BtPathStk;
275
276 typedef struct {
277         BtMgr *mgr;                     // buffer manager for thread
278         unsigned char *mem;     // frame, cursor, page memory buffer
279         BtPathStk path[1];      // cached frame path stack for begin/next
280         BtPage cursor;          // cached frame for start/next (never mapped)
281         BtPage frame;           // spare frame for the page split (never mapped)
282         uint *que;                      // binomial key distribution buffer
283         int found;                      // last delete or insert was found
284         int base;                       // maximum binomial assignment
285         int err;                        // last error
286 } BtDb;
287
288 typedef enum {
289         BTERR_ok = 0,
290         BTERR_struct,
291         BTERR_ovflw,
292         BTERR_lock,
293         BTERR_map,
294         BTERR_wrt,
295         BTERR_hash
296 } BTERR;
297
298 // B-Tree functions
299 extern void bt_close (BtDb *bt);
300 extern BtDb *bt_open (BtMgr *mgr);
301 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint stopper);
302 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl, uint stopper);
303 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
304 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
305 extern uint bt_nextkey   (BtDb *bt);
306
307 //      manager functions
308 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
309 void bt_mgrclose (BtMgr *mgr);
310
311 //      forward definitions
312 uint bt_rbremovefence (BtPage page, uint slot, BtPathStk *path);
313
314 //  Helper functions to return slot values
315 //      from the cursor page.
316
317 extern BtKey bt_key (BtDb *bt, uint slot);
318 extern BtVal bt_val (BtDb *bt, uint slot);
319
320 //  BTree page number constants
321 #define ALLOC_page              0       // allocation & latch manager hash table
322 #define ROOT_page               1       // root of the btree
323 #define LEAF_page               2       // first page of leaves
324 #define LATCH_page              3       // pages for latch manager
325
326 //      Number of levels to create in a new BTree
327
328 #define MIN_lvl                 2
329
330 //  The page is allocated from low and hi ends.
331 //  The key slots are allocated from the bottom,
332 //      and are organized into a balanced binary tree.
333 //      The text and value of the key
334 //  are allocated from the top.  When the two
335 //  areas meet, the page is split into two.
336
337 //  A key consists of a length byte, two bytes of
338 //  index number (0 - 65534), and up to 253 bytes
339 //  of key value.  Duplicate keys are discarded.
340 //  Associated with each key is a value byte string
341 //      containing any value desired.
342
343 //  The b-tree root is always located at page 1.
344 //      The first leaf page of level zero is always
345 //      located on page 2.
346
347 //      The b-tree pages are linked with next
348 //      pointers to facilitate enumerators,
349 //      and provide for concurrency.
350
351 //      When to root page fills, it is split in two and
352 //      the tree height is raised by a new root at page
353 //      one with two keys.
354
355 //      Deleted keys are marked with a dead bit until
356 //      page cleanup. The fence key for a leaf node is
357 //      always present
358
359 //  Groups of pages called segments from the btree are optionally
360 //  cached with a memory mapped pool. A hash table is used to keep
361 //  track of the cached segments.  This behaviour is controlled
362 //  by the cache block size parameter to bt_open.
363
364 //  To achieve maximum concurrency one page is locked at a time
365 //  as the tree is traversed to find leaf key in question. The right
366 //  page numbers are used in cases where the page is being split,
367 //      or consolidated.
368
369 //  Page 0 is dedicated to lock for new page extensions,
370 //      and chains empty pages together for reuse. It also
371 //      contains the latch manager hash table.
372
373 //      The ParentModification lock on a node is obtained to serialize posting
374 //      or changing the fence key for a node.
375
376 //      Empty pages are chained together through the ALLOC page and reused.
377
378 //      Access macros to address slot and key values from the page
379 //      Page slots use 1 based indexing.
380
381 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
382 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
383 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
384
385 void bt_putid(unsigned char *dest, uid id)
386 {
387 int i = BtId;
388
389         while( i-- )
390                 dest[i] = (unsigned char)id, id >>= 8;
391 }
392
393 uid bt_getid(unsigned char *src)
394 {
395 uid id = 0;
396 int i;
397
398         for( i = 0; i < BtId; i++ )
399                 id <<= 8, id |= *src++; 
400
401         return id;
402 }
403
404 //      Phase-Fair reader/writer lock implementation
405
406 void WriteLock (RWLock *lock)
407 {
408 ushort w, r, tix;
409
410 #ifdef unix
411         tix = __sync_fetch_and_add (lock->ticket, 1);
412 #else
413         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
414 #endif
415         // wait for our ticket to come up
416
417         while( tix != lock->serving[0] )
418 #ifdef unix
419                 sched_yield();
420 #else
421                 SwitchToThread ();
422 #endif
423
424         w = PRES | (tix & PHID);
425 #ifdef  unix
426         r = __sync_fetch_and_add (lock->rin, w);
427 #else
428         r = _InterlockedExchangeAdd16 (lock->rin, w);
429 #endif
430         while( r != *lock->rout )
431 #ifdef unix
432                 sched_yield();
433 #else
434                 SwitchToThread();
435 #endif
436 }
437
438 void WriteRelease (RWLock *lock)
439 {
440 #ifdef unix
441         __sync_fetch_and_and (lock->rin, ~MASK);
442 #else
443         _InterlockedAnd16 (lock->rin, ~MASK);
444 #endif
445         lock->serving[0]++;
446 }
447
448 void ReadLock (RWLock *lock)
449 {
450 ushort w;
451 #ifdef unix
452         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
453 #else
454         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
455 #endif
456         if( w )
457           while( w == (*lock->rin & MASK) )
458 #ifdef unix
459                 sched_yield ();
460 #else
461                 SwitchToThread ();
462 #endif
463 }
464
465 void ReadRelease (RWLock *lock)
466 {
467 #ifdef unix
468         __sync_fetch_and_add (lock->rout, RINC);
469 #else
470         _InterlockedExchangeAdd16 (lock->rout, RINC);
471 #endif
472 }
473
474 //      Spin Latch Manager
475
476 //      wait until write lock mode is clear
477 //      and add 1 to the share count
478
479 void bt_spinreadlock(BtSpinLatch *latch)
480 {
481 ushort prev;
482
483   do {
484 #ifdef unix
485         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
486 #else
487         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
488 #endif
489         //  see if exclusive request is granted or pending
490
491         if( !(prev & BOTH) )
492                 return;
493 #ifdef unix
494         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
495 #else
496         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
497 #endif
498 #ifdef  unix
499   } while( sched_yield(), 1 );
500 #else
501   } while( SwitchToThread(), 1 );
502 #endif
503 }
504
505 //      wait for other read and write latches to relinquish
506
507 void bt_spinwritelock(BtSpinLatch *latch)
508 {
509 ushort prev;
510
511   do {
512 #ifdef  unix
513         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
514 #else
515         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
516 #endif
517         if( !(prev & XCL) )
518           if( !(prev & ~BOTH) )
519                 return;
520           else
521 #ifdef unix
522                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
523 #else
524                 _InterlockedAnd16((ushort *)latch, ~XCL);
525 #endif
526 #ifdef  unix
527   } while( sched_yield(), 1 );
528 #else
529   } while( SwitchToThread(), 1 );
530 #endif
531 }
532
533 //      try to obtain write lock
534
535 //      return 1 if obtained,
536 //              0 otherwise
537
538 int bt_spinwritetry(BtSpinLatch *latch)
539 {
540 ushort prev;
541
542 #ifdef  unix
543         prev = __sync_fetch_and_or((ushort *)latch, XCL);
544 #else
545         prev = _InterlockedOr16((ushort *)latch, XCL);
546 #endif
547         //      take write access if all bits are clear
548
549         if( !(prev & XCL) )
550           if( !(prev & ~BOTH) )
551                 return 1;
552           else
553 #ifdef unix
554                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
555 #else
556                 _InterlockedAnd16((ushort *)latch, ~XCL);
557 #endif
558         return 0;
559 }
560
561 //      clear write mode
562
563 void bt_spinreleasewrite(BtSpinLatch *latch)
564 {
565 #ifdef unix
566         __sync_fetch_and_and((ushort *)latch, ~BOTH);
567 #else
568         _InterlockedAnd16((ushort *)latch, ~BOTH);
569 #endif
570 }
571
572 //      decrement reader count
573
574 void bt_spinreleaseread(BtSpinLatch *latch)
575 {
576 #ifdef unix
577         __sync_fetch_and_add((ushort *)latch, -SHARE);
578 #else
579         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
580 #endif
581 }
582
583 //      link latch table entry into latch hash table
584
585 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
586 {
587 BtLatchSet *set = bt->mgr->latchsets + victim;
588
589         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
590                 bt->mgr->latchsets[set->next].prev = victim;
591
592         bt->mgr->latchmgr->table[hashidx].slot = victim;
593         set->page_no = page_no;
594         set->hash = hashidx;
595         set->prev = 0;
596 }
597
598 //      release latch pin
599
600 void bt_unpinlatch (BtLatchSet *set)
601 {
602 #ifdef unix
603         __sync_fetch_and_add(&set->pin, -1);
604 #else
605         _InterlockedDecrement16 (&set->pin);
606 #endif
607 }
608
609 //      find existing latchset or inspire new one
610 //      return with latchset pinned
611
612 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
613 {
614 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
615 ushort slot, avail = 0, victim, idx;
616 BtLatchSet *set;
617
618         //  obtain read lock on hash table entry
619
620         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
621
622         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
623         {
624                 set = bt->mgr->latchsets + slot;
625                 if( page_no == set->page_no )
626                         break;
627         } while( slot = set->next );
628
629         if( slot ) {
630 #ifdef unix
631                 __sync_fetch_and_add(&set->pin, 1);
632 #else
633                 _InterlockedIncrement16 (&set->pin);
634 #endif
635         }
636
637     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
638
639         if( slot )
640                 return set;
641
642   //  try again, this time with write lock
643
644   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
645
646   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
647   {
648         set = bt->mgr->latchsets + slot;
649         if( page_no == set->page_no )
650                 break;
651         if( !set->pin && !avail )
652                 avail = slot;
653   } while( slot = set->next );
654
655   //  found our entry, or take over an unpinned one
656
657   if( slot || (slot = avail) ) {
658         set = bt->mgr->latchsets + slot;
659 #ifdef unix
660         __sync_fetch_and_add(&set->pin, 1);
661 #else
662         _InterlockedIncrement16 (&set->pin);
663 #endif
664         set->page_no = page_no;
665         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
666         return set;
667   }
668
669         //  see if there are any unused entries
670 #ifdef unix
671         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
672 #else
673         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
674 #endif
675
676         if( victim < bt->mgr->latchmgr->latchtotal ) {
677                 set = bt->mgr->latchsets + victim;
678 #ifdef unix
679                 __sync_fetch_and_add(&set->pin, 1);
680 #else
681                 _InterlockedIncrement16 (&set->pin);
682 #endif
683                 bt_latchlink (bt, hashidx, victim, page_no);
684                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
685                 return set;
686         }
687
688 #ifdef unix
689         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
690 #else
691         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
692 #endif
693   //  find and reuse previous lock entry
694
695   while( 1 ) {
696 #ifdef unix
697         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
698 #else
699         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
700 #endif
701         //      we don't use slot zero
702
703         if( victim %= bt->mgr->latchmgr->latchtotal )
704                 set = bt->mgr->latchsets + victim;
705         else
706                 continue;
707
708         //      take control of our slot
709         //      from other threads
710
711         if( set->pin || !bt_spinwritetry (set->busy) )
712                 continue;
713
714         idx = set->hash;
715
716         // try to get write lock on hash chain
717         //      skip entry if not obtained
718         //      or has outstanding locks
719
720         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
721                 bt_spinreleasewrite (set->busy);
722                 continue;
723         }
724
725         if( set->pin ) {
726                 bt_spinreleasewrite (set->busy);
727                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
728                 continue;
729         }
730
731         //  unlink our available victim from its hash chain
732
733         if( set->prev )
734                 bt->mgr->latchsets[set->prev].next = set->next;
735         else
736                 bt->mgr->latchmgr->table[idx].slot = set->next;
737
738         if( set->next )
739                 bt->mgr->latchsets[set->next].prev = set->prev;
740
741         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
742 #ifdef unix
743         __sync_fetch_and_add(&set->pin, 1);
744 #else
745         _InterlockedIncrement16 (&set->pin);
746 #endif
747         bt_latchlink (bt, hashidx, victim, page_no);
748         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
749         bt_spinreleasewrite (set->busy);
750         return set;
751   }
752 }
753
754 void bt_mgrclose (BtMgr *mgr)
755 {
756 BtPool *pool;
757 uint slot;
758
759         // release mapped pages
760         //      note that slot zero is never used
761
762         for( slot = 1; slot < mgr->poolmax; slot++ ) {
763                 pool = mgr->pool + slot;
764                 if( pool->slot )
765 #ifdef unix
766                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
767 #else
768                 {
769                         FlushViewOfFile(pool->map, 0);
770                         UnmapViewOfFile(pool->map);
771                         CloseHandle(pool->hmap);
772                 }
773 #endif
774         }
775
776 #ifdef unix
777         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
778         munmap (mgr->latchmgr, 2 * mgr->page_size);
779 #else
780         FlushViewOfFile(mgr->latchmgr, 0);
781         UnmapViewOfFile(mgr->latchmgr);
782         CloseHandle(mgr->halloc);
783 #endif
784 #ifdef unix
785         close (mgr->idx);
786         free (mgr->pool);
787         free (mgr->hash);
788         free ((void *)mgr->latch);
789         free (mgr);
790 #else
791         FlushFileBuffers(mgr->idx);
792         CloseHandle(mgr->idx);
793         GlobalFree (mgr->pool);
794         GlobalFree (mgr->hash);
795         GlobalFree ((void *)mgr->latch);
796         GlobalFree (mgr);
797 #endif
798 }
799
800 //      close and release memory
801
802 void bt_close (BtDb *bt)
803 {
804 #ifdef unix
805         if( bt->mem )
806                 free (bt->mem);
807 #else
808         if( bt->mem)
809                 VirtualFree (bt->mem, 0, MEM_RELEASE);
810 #endif
811         free (bt);
812 }
813
814 //  open/create new btree buffer manager
815
816 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
817 //              size of mapped page pool (e.g. 8192)
818
819 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
820 {
821 uint lvl, attr, cacheblk, last, slot, idx;
822 uint nlatchpage, latchhash;
823 unsigned char value[BtId];
824 BtLatchMgr *latchmgr;
825 off64_t size;
826 uint amt[1];
827 BtMgr* mgr;
828 BtKey key;
829 BtVal val;
830 int flag;
831
832 #ifndef unix
833 SYSTEM_INFO sysinfo[1];
834 #endif
835
836         // determine sanity of page size and buffer pool
837
838         if( bits > BT_maxbits )
839                 bits = BT_maxbits;
840         else if( bits < BT_minbits )
841                 bits = BT_minbits;
842
843         if( !poolmax )
844                 return NULL;    // must have buffer pool
845
846 #ifdef unix
847         mgr = calloc (1, sizeof(BtMgr));
848
849         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
850
851         if( mgr->idx == -1 )
852                 return free(mgr), NULL;
853         
854         cacheblk = 4096;        // minimum mmap segment size for unix
855
856 #else
857         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
858         attr = FILE_ATTRIBUTE_NORMAL;
859         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
860
861         if( mgr->idx == INVALID_HANDLE_VALUE )
862                 return GlobalFree(mgr), NULL;
863
864         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
865         GetSystemInfo(sysinfo);
866         cacheblk = sysinfo->dwAllocationGranularity;
867 #endif
868
869 #ifdef unix
870         latchmgr = malloc (BT_maxpage);
871         *amt = 0;
872
873         // read minimum page size to get root info
874
875         if( size = lseek (mgr->idx, 0L, 2) ) {
876                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
877                         bits = latchmgr->alloc->bits;
878                 else
879                         return free(mgr), free(latchmgr), NULL;
880         } else if( mode == BT_ro )
881                 return free(latchmgr), bt_mgrclose (mgr), NULL;
882 #else
883         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
884         size = GetFileSize(mgr->idx, amt);
885
886         if( size || *amt ) {
887                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
888                         return bt_mgrclose (mgr), NULL;
889                 bits = latchmgr->alloc->bits;
890         } else if( mode == BT_ro )
891                 return bt_mgrclose (mgr), NULL;
892 #endif
893
894         mgr->page_size = 1 << bits;
895         mgr->page_bits = bits;
896
897         mgr->poolmax = poolmax;
898         mgr->mode = mode;
899
900         if( cacheblk < mgr->page_size )
901                 cacheblk = mgr->page_size;
902
903         //  mask for partial memmaps
904
905         mgr->poolmask = (cacheblk >> bits) - 1;
906
907         //      see if requested size of pages per memmap is greater
908
909         if( (1 << segsize) > mgr->poolmask )
910                 mgr->poolmask = (1 << segsize) - 1;
911
912         mgr->seg_bits = 0;
913
914         while( (1 << mgr->seg_bits) <= mgr->poolmask )
915                 mgr->seg_bits++;
916
917         mgr->hashsize = hashsize;
918
919 #ifdef unix
920         mgr->pool = calloc (poolmax, sizeof(BtPool));
921         mgr->hash = calloc (hashsize, sizeof(ushort));
922         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
923 #else
924         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
925         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
926         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
927 #endif
928
929         if( size || *amt )
930                 goto mgrlatch;
931
932         // initialize an empty b-tree with latch page, root page, page of leaves
933         // and page(s) of latches
934
935         memset (latchmgr, 0, 1 << bits);
936         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
937         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
938         latchmgr->alloc->bits = mgr->page_bits;
939
940         latchmgr->nlatchpage = nlatchpage;
941         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
942
943         //  initialize latch manager
944
945         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
946
947         //      size of hash table = total number of latchsets
948
949         if( latchhash > latchmgr->latchtotal )
950                 latchhash = latchmgr->latchtotal;
951
952         latchmgr->latchhash = latchhash;
953
954 #ifdef unix
955         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
956                 return bt_mgrclose (mgr), NULL;
957 #else
958         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
959                 return bt_mgrclose (mgr), NULL;
960
961         if( *amt < mgr->page_size )
962                 return bt_mgrclose (mgr), NULL;
963 #endif
964
965         memset (latchmgr, 0, 1 << bits);
966         latchmgr->alloc->bits = mgr->page_bits;
967
968         for( lvl=MIN_lvl; lvl--; ) {
969                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
970                 slotptr(latchmgr->alloc, 1)->fence = 1;
971                 if( !lvl )
972                         slotptr(latchmgr->alloc, 1)->dead = 1;
973                 key = keyptr(latchmgr->alloc, 1);
974                 key->len = 2;           // create stopper key
975                 key->key[0] = 0xff;
976                 key->key[1] = 0xff;
977
978                 bt_putid(value, MIN_lvl - lvl + 1);
979                 val = valptr(latchmgr->alloc, 1);
980                 val->len = lvl ? BtId : 0;
981                 memcpy (val->value, value, val->len);
982
983                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
984                 latchmgr->alloc->root = 1;
985                 latchmgr->alloc->lvl = lvl;
986                 latchmgr->alloc->cnt = 1;
987                 latchmgr->alloc->act = 1;
988 #ifdef unix
989                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
990                         return bt_mgrclose (mgr), NULL;
991 #else
992                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
993                         return bt_mgrclose (mgr), NULL;
994
995                 if( *amt < mgr->page_size )
996                         return bt_mgrclose (mgr), NULL;
997 #endif
998         }
999
1000         // clear out latch manager locks
1001         //      and rest of pages to round out segment
1002
1003         memset(latchmgr, 0, mgr->page_size);
1004         last = MIN_lvl + 1;
1005
1006         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
1007 #ifdef unix
1008                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
1009 #else
1010                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
1011                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
1012                         return bt_mgrclose (mgr), NULL;
1013                 if( *amt < mgr->page_size )
1014                         return bt_mgrclose (mgr), NULL;
1015 #endif
1016                 last++;
1017         }
1018
1019 mgrlatch:
1020 #ifdef unix
1021         // mlock the root page and the latchmgr page
1022
1023         flag = PROT_READ | PROT_WRITE;
1024         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
1025         if( mgr->latchmgr == MAP_FAILED )
1026                 return bt_mgrclose (mgr), NULL;
1027         mlock (mgr->latchmgr, 2 * mgr->page_size);
1028
1029         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1030         if( mgr->latchsets == MAP_FAILED )
1031                 return bt_mgrclose (mgr), NULL;
1032         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1033 #else
1034         flag = PAGE_READWRITE;
1035         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1036         if( !mgr->halloc )
1037                 return bt_mgrclose (mgr), NULL;
1038
1039         flag = FILE_MAP_WRITE;
1040         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1041         if( !mgr->latchmgr )
1042                 return GetLastError(), bt_mgrclose (mgr), NULL;
1043
1044         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1045 #endif
1046
1047 #ifdef unix
1048         free (latchmgr);
1049 #else
1050         VirtualFree (latchmgr, 0, MEM_RELEASE);
1051 #endif
1052         return mgr;
1053 }
1054
1055 //      open BTree access method
1056 //      based on buffer manager
1057
1058 BtDb *bt_open (BtMgr *mgr)
1059 {
1060 BtDb *bt = malloc (sizeof(*bt));
1061
1062         memset (bt, 0, sizeof(*bt));
1063         bt->mgr = mgr;
1064 #ifdef unix
1065         bt->mem = malloc (3 *mgr->page_size);
1066 #else
1067         bt->mem = VirtualAlloc(NULL, 3 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1068 #endif
1069         bt->frame = (BtPage)bt->mem;
1070         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1071         bt->que = (uint *)(bt->mem + 2 * mgr->page_size);
1072         return bt;
1073 }
1074
1075 //  compare two keys, returning > 0, = 0, or < 0
1076 //  as the comparison value
1077 //      -1 -> go right
1078 //      1 -> go left
1079
1080 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1081 {
1082 uint len1 = key1->len;
1083 int ans;
1084
1085         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1086                 return ans > 0 ? 1 : -1;
1087
1088         if( len1 > len2 )
1089                 return 1;
1090         if( len1 < len2 )
1091                 return -1;
1092
1093         return 0;
1094 }
1095
1096 //      Buffer Pool mgr
1097
1098 // find segment in pool
1099 // must be called with hashslot idx locked
1100 //      return NULL if not there
1101 //      otherwise return node
1102
1103 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1104 {
1105 BtPool *pool;
1106 uint slot;
1107
1108         // compute start of hash chain in pool
1109
1110         if( slot = bt->mgr->hash[idx] ) 
1111                 pool = bt->mgr->pool + slot;
1112         else
1113                 return NULL;
1114
1115         page_no &= ~bt->mgr->poolmask;
1116
1117         while( pool->basepage != page_no )
1118           if( pool = pool->hashnext )
1119                 continue;
1120           else
1121                 return NULL;
1122
1123         return pool;
1124 }
1125
1126 // add segment to hash table
1127
1128 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1129 {
1130 BtPool *node;
1131 uint slot;
1132
1133         pool->hashprev = pool->hashnext = NULL;
1134         pool->basepage = page_no & ~bt->mgr->poolmask;
1135         pool->pin = CLOCK_bit + 1;
1136
1137         if( slot = bt->mgr->hash[idx] ) {
1138                 node = bt->mgr->pool + slot;
1139                 pool->hashnext = node;
1140                 node->hashprev = pool;
1141         }
1142
1143         bt->mgr->hash[idx] = pool->slot;
1144 }
1145
1146 //  map new buffer pool segment to virtual memory
1147
1148 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1149 {
1150 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1151 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1152 int flag;
1153
1154 #ifdef unix
1155         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1156         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1157         if( pool->map == MAP_FAILED )
1158                 return bt->err = BTERR_map;
1159
1160 #else
1161         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1162         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1163         if( !pool->hmap )
1164                 return bt->err = BTERR_map;
1165
1166         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1167         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (bt->mgr->poolmask+1) << bt->mgr->page_bits);
1168         if( !pool->map )
1169                 return bt->err = BTERR_map;
1170 #endif
1171         return bt->err = 0;
1172 }
1173
1174 //      calculate page within pool
1175
1176 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1177 {
1178 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1179 BtPage page;
1180
1181         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1182 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1183         return page;
1184 }
1185
1186 //  release pool pin
1187
1188 void bt_unpinpool (BtPool *pool)
1189 {
1190 #ifdef unix
1191         __sync_fetch_and_add(&pool->pin, -1);
1192 #else
1193         _InterlockedDecrement16 (&pool->pin);
1194 #endif
1195 }
1196
1197 //      find or place requested page in segment-pool
1198 //      return pool table entry, incrementing pin
1199
1200 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1201 {
1202 uint slot, hashidx, idx, victim;
1203 BtPool *pool, *node, *next;
1204
1205         //      lock hash table chain
1206
1207         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1208         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1209
1210         //      look up in hash table
1211
1212         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1213 #ifdef unix
1214                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1215                 __sync_fetch_and_add(&pool->pin, 1);
1216 #else
1217                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1218                 _InterlockedIncrement16 (&pool->pin);
1219 #endif
1220                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1221                 return pool;
1222         }
1223
1224         // allocate a new pool node
1225         // and add to hash table
1226
1227 #ifdef unix
1228         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1229 #else
1230         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1231 #endif
1232
1233         if( ++slot < bt->mgr->poolmax ) {
1234                 pool = bt->mgr->pool + slot;
1235                 pool->slot = slot;
1236
1237                 if( bt_mapsegment(bt, pool, page_no) )
1238                         return NULL;
1239
1240                 bt_linkhash(bt, pool, page_no, hashidx);
1241                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1242                 return pool;
1243         }
1244
1245         // pool table is full
1246         //      find best pool entry to evict
1247
1248 #ifdef unix
1249         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1250 #else
1251         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1252 #endif
1253
1254         while( 1 ) {
1255 #ifdef unix
1256                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1257 #else
1258                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1259 #endif
1260                 victim %= bt->mgr->poolmax;
1261                 pool = bt->mgr->pool + victim;
1262                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1263
1264                 if( !victim )
1265                         continue;
1266
1267                 // try to get write lock
1268                 //      skip entry if not obtained
1269
1270                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1271                         continue;
1272
1273                 //      skip this entry if
1274                 //      page is pinned
1275                 //  or clock bit is set
1276
1277                 if( pool->pin ) {
1278 #ifdef unix
1279                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1280 #else
1281                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1282 #endif
1283                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1284                         continue;
1285                 }
1286
1287                 // unlink victim pool node from hash table
1288
1289                 if( node = pool->hashprev )
1290                         node->hashnext = pool->hashnext;
1291                 else if( node = pool->hashnext )
1292                         bt->mgr->hash[idx] = node->slot;
1293                 else
1294                         bt->mgr->hash[idx] = 0;
1295
1296                 if( node = pool->hashnext )
1297                         node->hashprev = pool->hashprev;
1298
1299                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1300
1301                 //      remove old file mapping
1302 #ifdef unix
1303                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1304 #else
1305 //              FlushViewOfFile(pool->map, 0);
1306                 UnmapViewOfFile(pool->map);
1307                 CloseHandle(pool->hmap);
1308 #endif
1309                 pool->map = NULL;
1310
1311                 //  create new pool mapping
1312                 //  and link into hash table
1313
1314                 if( bt_mapsegment(bt, pool, page_no) )
1315                         return NULL;
1316
1317                 bt_linkhash(bt, pool, page_no, hashidx);
1318                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1319                 return pool;
1320         }
1321 }
1322
1323 // place write, read, or parent lock on requested page_no.
1324
1325 void bt_lockpage(BtLock mode, BtLatchSet *set)
1326 {
1327         switch( mode ) {
1328         case BtLockRead:
1329                 ReadLock (set->readwr);
1330                 break;
1331         case BtLockWrite:
1332                 WriteLock (set->readwr);
1333                 break;
1334         case BtLockAccess:
1335                 ReadLock (set->access);
1336                 break;
1337         case BtLockDelete:
1338                 WriteLock (set->access);
1339                 break;
1340         case BtLockParent:
1341                 WriteLock (set->parent);
1342                 break;
1343         }
1344 }
1345
1346 // remove write, read, or parent lock on requested page
1347
1348 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1349 {
1350         switch( mode ) {
1351         case BtLockRead:
1352                 ReadRelease (set->readwr);
1353                 break;
1354         case BtLockWrite:
1355                 WriteRelease (set->readwr);
1356                 break;
1357         case BtLockAccess:
1358                 ReadRelease (set->access);
1359                 break;
1360         case BtLockDelete:
1361                 WriteRelease (set->access);
1362                 break;
1363         case BtLockParent:
1364                 WriteRelease (set->parent);
1365                 break;
1366         }
1367 }
1368
1369 //      allocate a new page and write page into it
1370
1371 uid bt_newpage(BtDb *bt, BtPage page)
1372 {
1373 BtPageSet set[1];
1374 uid new_page;
1375 int blk;
1376
1377         //      lock allocation page
1378
1379         bt_spinwritelock(bt->mgr->latchmgr->lock);
1380
1381         // use empty chain first
1382         // else allocate empty page
1383
1384         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1385                 if( set->pool = bt_pinpool (bt, new_page) )
1386                         set->page = bt_page (bt, set->pool, new_page);
1387                 else
1388                         return 0;
1389
1390                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1391                 bt_unpinpool (set->pool);
1392         } else {
1393                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1394                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1395 #ifdef unix
1396                 // if writing first page of pool block, set file length thru last page
1397
1398                 if( (new_page & bt->mgr->poolmask) == 0 )
1399                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1400 #endif
1401         }
1402 #ifdef unix
1403         // unlock allocation latch
1404
1405         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1406 #endif
1407
1408         //      bring new page into pool and copy page.
1409         //      this will extend the file into the new pages on WIN32.
1410
1411         if( set->pool = bt_pinpool (bt, new_page) )
1412                 set->page = bt_page (bt, set->pool, new_page);
1413         else
1414                 return 0;
1415
1416         memcpy(set->page, page, bt->mgr->page_size);
1417         bt_unpinpool (set->pool);
1418
1419 #ifndef unix
1420         // unlock allocation latch
1421
1422         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1423 #endif
1424         return new_page;
1425 }
1426
1427 //  find slot in page for given key at a given level
1428
1429 uint bt_findslot (BtPage page, BtPage src, unsigned char *key, uint len, BtPathStk *path, uint stopper)
1430 {
1431 BtSlot *node;
1432 uint slot;
1433
1434   path->lvl = -1;
1435   path->ge = -1;
1436
1437   if( slot = page->root ) do {
1438         path->entry[++path->lvl].slot = slot;
1439         node = slotptr(page,slot);
1440
1441         if( node->fence && !bt_getid(page->right) && stopper )
1442           path->entry[path->lvl].cmp = 0;
1443         else if( node->fence && !bt_getid(page->right) )
1444           path->entry[path->lvl].cmp = 1;
1445         else if( stopper )
1446           path->entry[path->lvl].cmp = -1;
1447         else
1448           path->entry[path->lvl].cmp = keycmp (keyptr(src, slot), key, len);
1449
1450         if( path->entry[path->lvl].cmp == 0 )
1451                 return path->ge = path->lvl, slot;
1452
1453         if( path->entry[path->lvl].cmp > 0 )
1454                 path->ge = path->lvl, slot = node->left;
1455         else
1456                 slot = node->right;
1457   } while( slot && path->lvl < BT_maxbits );
1458   else
1459         return 0;
1460
1461   return path->entry[path->lvl].slot;
1462 }
1463
1464 // return next slot on the page using the path stack
1465
1466 uint bt_nextslot (BtPage page, BtPathStk *path)
1467 {
1468 uint slot, next;
1469 BtSlot *node;
1470
1471         slot = path->entry[path->lvl].slot;
1472         node = slotptr(page,slot);
1473
1474         if( slot = node->right ) {
1475           path->entry[path->lvl++].cmp = -1;
1476           path->entry[path->lvl].slot = slot;
1477
1478           while( slot = slotptr(page,slot)->left ) {
1479                 path->entry[path->lvl++].cmp = 1;
1480                 path->entry[path->lvl].slot = slot;
1481           }
1482
1483           return path->entry[path->lvl].slot;
1484         }
1485
1486         while( path->lvl )
1487           if( path->entry[--path->lvl].cmp > 0 )
1488                 return path->entry[path->lvl].slot;
1489
1490         return 0;
1491 }
1492
1493 //  find and load page at given level for given key
1494 //      leave page rd or wr locked as requested
1495
1496 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, BtPathStk *path, uint stopper)
1497 {
1498 uid page_no = ROOT_page, prevpage = 0;
1499 uint drill = 0xff, slot;
1500 BtLatchSet *prevlatch;
1501 uint mode, prevmode;
1502 BtPool *prevpool;
1503
1504   //  start at root of btree and drill down
1505
1506   do {
1507         // determine lock mode of drill level
1508         mode = (drill == lvl) ? lock : BtLockRead; 
1509
1510         set->latch = bt_pinlatch (bt, page_no);
1511         set->page_no = page_no;
1512
1513         // pin page contents
1514
1515         if( set->pool = bt_pinpool (bt, page_no) )
1516                 set->page = bt_page (bt, set->pool, page_no);
1517         else
1518                 return 0;
1519
1520         // obtain access lock using lock chaining with Access mode
1521
1522         if( page_no > ROOT_page )
1523           bt_lockpage(BtLockAccess, set->latch);
1524
1525         //      release & unpin parent page
1526
1527         if( prevpage ) {
1528           bt_unlockpage(prevmode, prevlatch);
1529           bt_unpinlatch (prevlatch);
1530           bt_unpinpool (prevpool);
1531           prevpage = 0;
1532         }
1533
1534         // obtain read lock using lock chaining
1535
1536         bt_lockpage(mode, set->latch);
1537
1538         if( set->page->free )
1539                 return bt->err = BTERR_struct, 0;
1540
1541         if( page_no > ROOT_page )
1542           bt_unlockpage(BtLockAccess, set->latch);
1543
1544         // re-read and re-lock root after determining actual level of root
1545
1546         if( set->page->lvl != drill) {
1547                 if( set->page_no != ROOT_page )
1548                         return bt->err = BTERR_struct, 0;
1549                         
1550                 drill = set->page->lvl;
1551
1552                 if( lock != BtLockRead && drill == lvl ) {
1553                   bt_unlockpage(mode, set->latch);
1554                   bt_unpinlatch (set->latch);
1555                   bt_unpinpool (set->pool);
1556                   continue;
1557                 }
1558         }
1559
1560         prevpage = set->page_no;
1561         prevlatch = set->latch;
1562         prevpool = set->pool;
1563         prevmode = mode;
1564
1565         //  find key on page at this level
1566         //  and descend to requested level
1567
1568         if( set->page->kill )
1569           goto slideright;
1570
1571         slot = bt_findslot (set->page, set->page, key, len, path, stopper);
1572
1573         if( path->ge < 0 )
1574           goto slideright;
1575
1576         if( drill == lvl )
1577           return slot;
1578
1579         slot = path->entry[path->ge].slot;
1580         path->lvl = path->ge;
1581
1582         while( slotptr(set->page, slot)->dead )
1583           if( slot = bt_nextslot (set->page, path) )
1584                 continue;
1585           else
1586                 goto slideright;
1587
1588         page_no = bt_getid(valptr(set->page, slot)->value);
1589         drill--;
1590         continue;
1591
1592         //  or slide right into next page
1593
1594 slideright:
1595         page_no = bt_getid(set->page->right);
1596
1597   } while( page_no );
1598
1599   // return error on end of right chain
1600
1601   bt->err = BTERR_struct;
1602   return 0;     // return error
1603 }
1604
1605 //      return page to free list
1606 //      page must be delete & write locked
1607
1608 void bt_freepage (BtDb *bt, BtPageSet *set)
1609 {
1610         //      lock allocation page
1611
1612         bt_spinwritelock (bt->mgr->latchmgr->lock);
1613
1614         //      store chain
1615         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1616         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1617         set->page->free = 1;
1618
1619         // unlock released page
1620
1621         bt_unlockpage (BtLockDelete, set->latch);
1622         bt_unlockpage (BtLockWrite, set->latch);
1623         bt_unpinlatch (set->latch);
1624         bt_unpinpool (set->pool);
1625
1626         // unlock allocation page
1627
1628         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1629 }
1630
1631 //      a fence key was deleted from a page
1632 //      push new fence value upwards in the btree
1633
1634 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1635 {
1636 unsigned char leftkey[256], rightkey[256];
1637 unsigned char value[BtId];
1638 BtPathStk path[1];
1639 uint slot, fence;
1640 uid page_no;
1641 BtKey ptr;
1642
1643         //      remove the old fence value
1644
1645         fence = set->page->root;
1646         slot = set->page->root;
1647         path->lvl = -1;
1648
1649         do path->entry[++path->lvl].slot = fence = slot;
1650         while( slot = slotptr(set->page,slot)->right );
1651
1652         ptr = keyptr(set->page, fence);
1653         memcpy (rightkey, ptr, ptr->len + 1);
1654
1655         do fence = bt_rbremovefence (set->page, fence, path);
1656         while( slotptr(set->page,fence)->dead );
1657
1658         set->page->dirty = 1;
1659
1660         ptr = keyptr(set->page, fence);
1661         memcpy (leftkey, ptr, ptr->len + 1);
1662         page_no = set->page_no;
1663
1664         bt_lockpage (BtLockParent, set->latch);
1665         bt_unlockpage (BtLockWrite, set->latch);
1666
1667         //      insert new (now smaller) fence key
1668
1669         bt_putid (value, page_no);
1670
1671         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId, 0) )
1672           return bt->err;
1673
1674         //      now delete old fence key
1675
1676         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1, !bt_getid (set->page->right)) )
1677                 return bt->err;
1678
1679         bt_unlockpage (BtLockParent, set->latch);
1680         bt_unpinlatch(set->latch);
1681         bt_unpinpool (set->pool);
1682         return 0;
1683 }
1684
1685 //      root has a single child
1686 //      collapse a level from the tree
1687
1688 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1689 {
1690 BtPageSet child[1];
1691 uint slot;
1692
1693   // find the child entry and promote as new root contents
1694
1695   do {
1696         slot = root->page->root;
1697         child->page_no = bt_getid (valptr(root->page, slot)->value);
1698
1699         child->latch = bt_pinlatch (bt, child->page_no);
1700         bt_lockpage (BtLockDelete, child->latch);
1701         bt_lockpage (BtLockWrite, child->latch);
1702
1703         if( child->pool = bt_pinpool (bt, child->page_no) )
1704                 child->page = bt_page (bt, child->pool, child->page_no);
1705         else
1706                 return bt->err;
1707
1708         memcpy (root->page, child->page, bt->mgr->page_size);
1709         bt_freepage (bt, child);
1710
1711   } while( root->page->lvl > 1 && root->page->act == 1 );
1712
1713   bt_unlockpage (BtLockWrite, root->latch);
1714   bt_unpinlatch (root->latch);
1715   bt_unpinpool (root->pool);
1716   return 0;
1717 }
1718
1719 //  find and delete key on page by marking delete flag bit
1720 //  if page becomes empty, delete it from the btree
1721
1722 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl, uint stopper)
1723 {
1724 unsigned char lowerfence[256], higherfence[256];
1725 uint slot, idx, dirty = 0, fence, found;
1726 BtPageSet set[1], right[1];
1727 unsigned char value[BtId];
1728 BtPathStk path[1];
1729 BtSlot *node;
1730 BtKey ptr;
1731 BtVal val;
1732
1733         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite, path, stopper) )
1734                 node = slotptr(set->page, slot);
1735         else
1736                 return bt->err;
1737
1738         // if key is found delete it, otherwise ignore request
1739
1740         if( found = !path->entry[path->lvl].cmp )
1741           if( found = node->dead == 0 ) {
1742                 dirty = node->dead = 1;
1743                 set->page->dirty = 1;
1744                 set->page->act--;
1745           }
1746
1747         //      did we delete a fence key in an upper level?
1748
1749         if( lvl && node->fence )
1750          if( dirty && set->page->act )
1751           if( bt_fixfence (bt, set, lvl) )
1752                 return bt->err;
1753           else
1754                 return bt->found = found, 0;
1755
1756         //      is this a collapsed root?
1757
1758         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1759           if( bt_collapseroot (bt, set) )
1760                 return bt->err;
1761           else
1762                 return bt->found = found, 0;
1763
1764         //      return if page is not empty
1765
1766         if( set->page->act ) {
1767                 bt_unlockpage(BtLockWrite, set->latch);
1768                 bt_unpinlatch (set->latch);
1769                 bt_unpinpool (set->pool);
1770                 return bt->found = found, 0;
1771         }
1772
1773         //      cache copy of fence key
1774         //      to post in parent
1775
1776         fence = set->page->root;
1777         slot = set->page->root;
1778
1779         while( slot = slotptr(set->page,slot)->right )
1780                 fence = slot;
1781
1782         ptr = keyptr(set->page, fence);
1783         memcpy (lowerfence, ptr, ptr->len + 1);
1784
1785         //      obtain lock on right page
1786
1787         right->page_no = bt_getid(set->page->right);
1788         right->latch = bt_pinlatch (bt, right->page_no);
1789         bt_lockpage (BtLockWrite, right->latch);
1790
1791         // pin page contents
1792
1793         if( right->pool = bt_pinpool (bt, right->page_no) )
1794                 right->page = bt_page (bt, right->pool, right->page_no);
1795         else
1796                 return 0;
1797
1798         if( right->page->kill )
1799                 return bt->err = BTERR_struct;
1800
1801         // pull contents of right peer into our empty page
1802
1803         memcpy (set->page, right->page, bt->mgr->page_size);
1804
1805         // cache copy of key to update
1806
1807         fence = set->page->root;
1808         slot = set->page->root;
1809
1810         while( slot = slotptr(set->page,slot)->right )
1811                 fence = slot;
1812
1813         ptr = keyptr(right->page, fence);
1814         memcpy (higherfence, ptr, ptr->len + 1);
1815
1816         // mark right page deleted and point it to left page
1817         //      until we can post parent updates
1818
1819         bt_putid (right->page->right, set->page_no);
1820         right->page->kill = 1;
1821
1822         bt_lockpage (BtLockParent, right->latch);
1823         bt_unlockpage (BtLockWrite, right->latch);
1824
1825         bt_lockpage (BtLockParent, set->latch);
1826         bt_unlockpage (BtLockWrite, set->latch);
1827
1828         // redirect higher key directly to our new node contents
1829
1830         stopper = !bt_getid (set->page->right);
1831         bt_putid (value, set->page_no);
1832
1833         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId, stopper) )
1834           return bt->err;
1835
1836         //      delete old lower key to our node
1837
1838         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1, 0) )
1839           return bt->err;
1840
1841         //      obtain delete and write locks to right node
1842
1843         bt_unlockpage (BtLockParent, right->latch);
1844         bt_lockpage (BtLockDelete, right->latch);
1845         bt_lockpage (BtLockWrite, right->latch);
1846         bt_freepage (bt, right);
1847
1848         bt_unlockpage (BtLockParent, set->latch);
1849         bt_unpinlatch (set->latch);
1850         bt_unpinpool (set->pool);
1851         bt->found = found;
1852         return 0;
1853 }
1854
1855 //      find key in leaf level and return number of value bytes
1856 //      or (-1) if not found
1857
1858 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1859 {
1860 BtPageSet set[1];
1861 BtPathStk path[1];
1862 uint  slot;
1863 BtKey ptr;
1864 BtVal val;
1865 int ret;
1866
1867         if( !(slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead, path, 0)) )
1868                 return -1;
1869
1870         // if key exists, return TRUE
1871         //      otherwise return FALSE
1872
1873         if( !slotptr(set->page, slot)->dead && !path->entry[path->lvl].cmp ) {
1874                 val = valptr (set->page,slot);
1875                 if( valmax > val->len )
1876                         valmax = val->len;
1877                 memcpy (value, val->value, valmax);
1878                 ret = valmax;
1879         } else
1880                 ret = -1;
1881
1882         bt_unlockpage (BtLockRead, set->latch);
1883         bt_unpinlatch (set->latch);
1884         bt_unpinpool (set->pool);
1885         return ret;
1886 }
1887
1888 //      left rotate parent node
1889
1890 void bt_leftrotate (BtPage page, uint slot, BtSlot *parent, int cmp)
1891 {
1892 BtSlot *x = slotptr(page,slot);
1893 uint right = x->right;
1894 BtSlot *y = slotptr(page,right);
1895
1896         x->right = y->left;
1897
1898         if( !parent ) //        is x the root node?
1899                 page->root = right;
1900         else if( cmp == 1 )
1901                 parent->left = right;
1902         else
1903                 parent->right = right;
1904
1905         y->left = slot;
1906 }
1907
1908 //      right rotate parent node
1909
1910 void bt_rightrotate (BtPage page, uint slot, BtSlot *parent, int cmp)
1911 {
1912 BtSlot *x = slotptr(page,slot);
1913 uint left = x->left;
1914 BtSlot *y = slotptr(page,left);
1915
1916         x->left = y->right;
1917
1918         if( !parent ) //        is y the root node?
1919                 page->root = left;
1920         else if( cmp == 1 )
1921                 parent->left = left;
1922         else
1923                 parent->right = left;
1924
1925         y->right = slot;
1926 }
1927
1928 //      delete fence slot from rbtree at path point
1929 //      return with new fence slot
1930
1931 uint bt_rbremovefence (BtPage page, uint slot, BtPathStk *path)
1932 {
1933 BtSlot *node = slotptr (page,slot), *parent, *sibling, *grand;
1934 uint red = node->red, lvl, fence, idx, left;
1935
1936         if( lvl =  path->lvl ) {
1937                 parent = slotptr(page,path->entry[lvl - 1].slot);
1938                 parent->right = node->left;
1939         } else
1940                 page->root = node->left;
1941
1942         if( fence = node->left )
1943                 node = slotptr(page,fence);
1944         else {
1945                 parent->fence = 1;
1946                 return path->entry[--path->lvl].slot;
1947         }
1948
1949         //  extend path to new fence
1950
1951         path->entry[page->lvl].slot = fence;
1952
1953         while( slot = slotptr(page, fence)->right )
1954                 path->entry[++page->lvl].slot = fence = slot;
1955
1956         slotptr(page,fence)->fence = 1;
1957
1958         //      fixup colors
1959
1960         if( !red )
1961          while( !node->red && lvl ) {
1962                 left = parent->left;
1963                 sibling = slotptr(page,left);
1964                 if( sibling->red ) {
1965                   sibling->red = 0;
1966                   parent->red = 1;
1967                   if( lvl > 1 )
1968                         grand = slotptr(page,path->entry[lvl-2].slot);
1969                   else
1970                         grand = NULL;
1971                   bt_rightrotate(page, path->entry[lvl-1].slot, grand, -1);
1972                   sibling = slotptr(page,parent->left);
1973
1974                   for( idx = ++path->lvl; idx > lvl - 1; idx-- )
1975                         path->entry[idx].slot = path->entry[idx-1].slot;
1976
1977                   path->entry[idx].slot = left; 
1978                 }
1979
1980                 if( !sibling->right || !slotptr(page,sibling->right)->red )
1981                   if( !sibling->left || !slotptr(page,sibling->left)->red ) {
1982                         sibling->red = 1;
1983                         node = parent;
1984                         parent = grand;
1985                         lvl--;
1986                         continue;
1987                   }
1988
1989                 if( !sibling->left || !slotptr(page,sibling->left)->red ) {
1990                         if( sibling->right )
1991                           slotptr(page,sibling->right)->red = 0;
1992
1993                         sibling->red = 1;
1994                         bt_leftrotate (page, parent->left, parent, 1);
1995                         sibling = slotptr(page,parent->left);
1996                 }
1997
1998                 slotptr(page, sibling->left)->red = 0;
1999                 sibling->red = parent->red;
2000                 parent->red = 0;
2001                 bt_rightrotate(page, path->entry[lvl-1].slot, grand, -1);
2002                 break;
2003          }
2004
2005         slotptr(page,page->root)->red = 0;
2006         return fence;
2007 }
2008
2009 //      insert slot into rbtree at path point
2010
2011 void bt_rbinsert (BtPage page, uint slot, BtPathStk *path)
2012 {
2013 BtSlot *parent = slotptr(page,path->entry[path->lvl].slot);
2014 BtSlot *uncle, *grand;
2015 int lvl = path->lvl;
2016
2017         if( path->entry[lvl].cmp == 1 )
2018                 parent->left = slot;
2019         else
2020                 parent->right = slot;
2021
2022         slotptr(page,slot)->red = 1;
2023
2024         while( lvl > 0 && parent->red ) {
2025           grand = slotptr(page,path->entry[lvl-1].slot);
2026
2027           if( path->entry[lvl-1].cmp == 1 ) { // was grandparent left followed?
2028                 uncle = slotptr(page,grand->right);
2029                 if( grand->right && uncle->red ) {
2030                   parent->red = 0;
2031                   uncle->red = 0;
2032                   grand->red = 1;
2033
2034                   // move to grandparent & its parent (if any)
2035
2036                   slot = path->entry[--lvl].slot;
2037                   if( !lvl )
2038                         break;
2039                   parent = slotptr(page,path->entry[--lvl].slot);
2040                   continue;
2041                 }
2042
2043                 // was the parent right link followed?
2044                 // if so, left rotate parent
2045
2046                 if( path->entry[lvl].cmp == -1 ) {
2047                   bt_leftrotate(page, path->entry[lvl].slot, grand, path->entry[lvl-1].cmp);
2048                   parent = slotptr(page,slot);  // slot was rotated to parent
2049                 }
2050
2051                 parent->red = 0;
2052                 grand->red = 1;
2053
2054                 //      get pointer to grandparent's parent
2055
2056                 if( lvl>1 )
2057                 grand = slotptr(page,path->entry[lvl-2].slot);
2058                 else
2059                         grand = NULL;
2060
2061                 //  right rotate the grandparent slot
2062
2063                 slot = path->entry[lvl-1].slot;
2064                 bt_rightrotate(page, slot, grand, path->entry[lvl-2].cmp);
2065                 return;
2066           } else {      // symmetrical case
2067                 uncle = slotptr(page,grand->left);
2068                 if( grand->left && uncle->red ) {
2069                   uncle->red = 0;
2070                   parent->red = 0;
2071                   grand->red = 1;
2072
2073                   // move to grandparent & its parent (if any)
2074                   slot = path->entry[--lvl].slot;
2075                   if( !lvl )
2076                         break;
2077                   parent = slotptr(page,path->entry[--lvl].slot);
2078                   continue;
2079                 }
2080
2081                 // was the parent left link followed?
2082                 // if so, right rotate parent
2083
2084                 if( path->entry[lvl].cmp == 1 ) {
2085                   bt_rightrotate(page, path->entry[lvl].slot, grand, path->entry[lvl-1].cmp);
2086                   parent = slotptr(page,slot);  // slot was rotated to parent
2087                 }
2088
2089                 parent->red = 0;
2090                 grand->red = 1;
2091
2092                 //      get pointer to grandparent's parent
2093
2094                 if( lvl>1 )
2095                 grand = slotptr(page,path->entry[lvl-2].slot);
2096                 else
2097                         grand = NULL;
2098
2099                 //  left rotate the grandparent slot
2100
2101                 slot = path->entry[lvl-1].slot;
2102                 bt_leftrotate(page, slot, grand, path->entry[lvl-2].cmp);
2103                 return;
2104           }
2105         }
2106
2107         //      reset root color
2108
2109         slotptr(page,page->root)->red = 0;
2110 }
2111
2112 //      transfer a slot from one page to another
2113
2114 void bt_xfrslot (BtPage page, BtPage src, uint slot, BtPathStk *path, uint copykey)
2115 {
2116 BtKey key = keyptr(src,slot);
2117 BtVal val = valptr(src,slot);
2118 BtSlot *node;
2119
2120         // calculate next available slot and copy key into page
2121
2122   if( copykey ) {
2123         page->min -= val->len + 1; // reset lowest used offset
2124         ((unsigned char *)page)[page->min] = val->len;
2125         memcpy ((unsigned char *)page + page->min +1, val->value, val->len );
2126
2127         page->min -= key->len + 1; // reset lowest used offset
2128         ((unsigned char *)page)[page->min] = key->len;
2129         memcpy ((unsigned char *)page + page->min +1, key->key, key->len );
2130   }
2131
2132   node = slotptr(page, ++page->cnt);
2133   node->off = copykey ? page->min : slotptr(src,slot)->off;
2134   node->fence = slotptr(src,slot)->fence;
2135   node->dead = slotptr(src,slot)->dead;
2136
2137   page->act++;
2138   
2139   if( path->lvl < 0 ) {
2140         page->root = page->cnt;
2141         return;
2142   }
2143
2144   bt_rbinsert (page, page->cnt, path);
2145 }
2146
2147 //      copy keys across into a binomial tree
2148
2149 void bt_copykeys (BtPage page, uint slot, BtPage frame, uint *que, int lvl)
2150 {
2151 BtSlot *node = slotptr(page,slot);
2152 BtKey key = ((BtKey)((unsigned char*)(frame) + node->off));
2153 BtVal val = ((BtVal)(key->key + key->len));
2154 uint off, nxt = page->min;
2155 uint right = node->right;
2156 uint left = node->left;
2157
2158         // copy value
2159
2160         nxt -= val->len + 1;
2161         ((unsigned char *)page)[nxt] = val->len;
2162         memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
2163
2164         //      copy key
2165
2166         nxt -= key->len + 1;
2167         memcpy ((unsigned char *)page + nxt, key, key->len + 1);
2168         node->off = nxt;
2169         page->min = nxt;
2170
2171         //      punt if group of keys has filled a 4K VM block
2172         //      which we determine by the number of red/black
2173         //      levels have been copied across.
2174
2175         if( lvl > BT_binomial ) {
2176           if( left )
2177                 que[++(*que)] = left;
2178           if( right )
2179                 que[++(*que)] = right;
2180           return;
2181         }
2182
2183         if( left )
2184           bt_copykeys (page, left, frame, que, lvl+1);
2185
2186         if( right )
2187           bt_copykeys (page, right, frame, que, lvl+1);
2188 }
2189
2190 //      clean page and rebuild red-black tree
2191 //      return 0 - page needs splitting
2192 //      >0  cleanup done, try again
2193
2194 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint vallen)
2195 {
2196 uint max = page->cnt;
2197 BtPathStk path[1];
2198 uint cnt, slot;
2199 BtSlot *node;
2200 uid right;
2201 BtKey key;
2202 BtVal val;
2203
2204         //      skip cleanup if nothing to reclaim
2205
2206         if( !page->dirty )
2207                 return 0;
2208
2209         memcpy (bt->frame, page, bt->mgr->page_size);
2210
2211         // skip page info and set rest of page to zero
2212
2213         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
2214         page->min = bt->mgr->page_size;
2215         page->dirty = 0;
2216         page->root = 0;
2217         page->cnt = 0;
2218         page->act = 0;
2219
2220         // try cleaning up page first
2221         // by removing deleted keys
2222         //      from the heap
2223
2224         right = bt_getid (bt->frame->right);
2225         slot = 0;
2226
2227         while( ++slot <= max ) {
2228                 node = slotptr(bt->frame,slot);
2229
2230                 if( !node->off )
2231                         continue;
2232
2233                 if( page->lvl || !node->fence )
2234                    if( node->dead )
2235                         continue;
2236
2237                 // xfr the slot to the page b-heap using keys from bt->frame
2238
2239                 key = keyptr(bt->frame, slot);
2240                 bt_findslot (page, bt->frame, key->key, key->len, path, node->fence && !right);
2241                 bt_xfrslot (page, bt->frame, slot, path, 0);
2242         }
2243
2244         // now copy keys & values across from bt->frame to page
2245         //      in blocks of BT_binomial tree levels
2246
2247         page->min = bt->mgr->page_size;
2248         bt->que[1] = page->root;
2249         *bt->que = 1;
2250         cnt = 0;
2251
2252         do bt_copykeys (page, bt->que[++cnt], bt->frame, bt->que, 0);
2253         while( cnt < *bt->que );
2254
2255         if( page->min < (page->cnt+1) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
2256                 return 0;
2257
2258         return 1;
2259 }
2260
2261 // split the root and raise the height of the btree
2262
2263 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
2264 {
2265 uint nxt = bt->mgr->page_size;
2266 unsigned char value[BtId];
2267 BtSlot *node;
2268 uid left;
2269
2270         //  Obtain an empty page to use, and copy the current
2271         //  root contents into it, e.g. lower keys
2272
2273         if( !(left = bt_newpage(bt, root->page)) )
2274                 return bt->err;
2275
2276         // preserve the page info at the bottom
2277         // of higher keys and set rest to zero
2278
2279         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
2280
2281         // insert lower keys page fence key on newroot page as first key
2282
2283         nxt -= BtId + 1;
2284         bt_putid (value, left);
2285         ((unsigned char *)root->page)[nxt] = BtId;
2286         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
2287
2288         nxt -= *leftkey + 1;
2289         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
2290         node = slotptr(root->page, 1);
2291         node->off = nxt;
2292         node->red = 1;
2293         
2294         // insert stopper key on newroot page
2295         // and increase the root height
2296
2297         nxt -= 3 + BtId + 1;
2298         ((unsigned char *)root->page)[nxt] = 2;
2299         ((unsigned char *)root->page)[nxt+1] = 0xff;
2300         ((unsigned char *)root->page)[nxt+2] = 0xff;
2301
2302         bt_putid (value, page_no2);
2303         ((unsigned char *)root->page)[nxt+3] = BtId;
2304         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
2305         node = slotptr(root->page, 2);
2306         node->fence = 1;
2307         node->off = nxt;
2308         node->left = 1;
2309
2310         bt_putid(root->page->right, 0);
2311         root->page->min = nxt;          // reset lowest used offset and key count
2312         root->page->root = 2;
2313         root->page->cnt = 2;
2314         root->page->act = 2;
2315         root->page->lvl++;
2316
2317         // release and unpin root
2318
2319         bt_unlockpage(BtLockWrite, root->latch);
2320         bt_unpinlatch (root->latch);
2321         bt_unpinpool (root->pool);
2322         return 0;
2323 }
2324
2325 //      copy sub-tree from one node to the root of another
2326 //      return slot number in destination
2327
2328 uint bt_copysubtree (BtDb *bt, BtPage dest, BtPage src, uint idx, uint parent, uint off)
2329 {
2330 BtSlot *node = slotptr(src, idx);
2331 uint child, slot;
2332
2333         slot = parent * 2 + off;
2334
2335         if( slot > bt->base )
2336                 slot = ++dest->cnt;
2337
2338         *slotptr(dest, slot) = *node;
2339         dest->act++;
2340
2341         if( child = node->left )
2342           child = bt_copysubtree (bt, dest, src, child, slot, 0);
2343
2344         slotptr(dest, slot)->left = child;
2345
2346         if( child = node->right )
2347           child = bt_copysubtree (bt, dest, src, child, slot, 1);
2348
2349         slotptr(dest, slot)->right = child;
2350         return slot;
2351 }
2352
2353 //  split already locked full node
2354 //      return unlocked.
2355
2356 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
2357 {
2358 unsigned char fencekey[256], rightkey[256];
2359 unsigned char value[BtId];
2360 uint lvl = set->page->lvl;
2361 uint prev, fence, stopper;
2362 BtPageSet right[1];
2363 BtPathStk path[1];
2364 uint cnt, slot;
2365 BtSlot *node;
2366 BtKey key;
2367 BtVal val;
2368
2369         //  split higher half of keys to bt->frame
2370
2371         slot = slotptr(set->page, set->page->root)->right;
2372         memset (bt->frame, 0, bt->mgr->page_size);
2373         bt->base = set->page->cnt / 2;
2374         bt->frame->cnt = bt->base;
2375
2376         bt->frame->root = bt_copysubtree (bt, bt->frame, set->page, slot, 0, 1);
2377         bt->frame->lvl = set->page->lvl;
2378
2379         // now copy keys & values across from page to bt->frame
2380         //      in blocks of BT_binomial tree levels
2381
2382         slotptr(bt->frame,bt->frame->root)->red = 0;
2383         bt->frame->min = bt->mgr->page_size;
2384         bt->que[1] = bt->frame->root;
2385         *bt->que = 1;
2386         cnt = 0;
2387
2388         do bt_copykeys (bt->frame, bt->que[++cnt], set->page, bt->que, 0);
2389         while( cnt < *bt->que );
2390
2391         // remember existing fence key for new page to the right
2392
2393         fence = set->page->root;
2394         slot = set->page->root;
2395
2396         while( slot = slotptr(set->page,slot)->right )
2397                 fence = slot;
2398
2399         key = keyptr(set->page,fence);
2400         memcpy (rightkey, key, key->len + 1);
2401
2402         bt->frame->bits = bt->mgr->page_bits;
2403         bt->frame->lvl = lvl;
2404
2405         // link right node
2406
2407         if( set->page_no > ROOT_page )
2408                 memcpy (bt->frame->right, set->page->right, BtId);
2409
2410         stopper = !bt_getid (bt->frame->right);
2411
2412         //      get new free page and write higher keys in bt->frame to it.
2413
2414         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2415                 return bt->err;
2416
2417         //      update lower keys to continue in old page
2418
2419         memcpy (bt->frame, set->page, bt->mgr->page_size);
2420         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2421         slot = slotptr(bt->frame, bt->frame->root)->left;
2422         set->page->cnt = bt->base;
2423         set->page->dirty = 0;
2424         set->page->act = 0;
2425
2426         //  assemble page of smaller keys in set->page
2427
2428         set->page->root = bt_copysubtree (bt, set->page, bt->frame, slot, 0, 1);
2429         set->page->min = bt->mgr->page_size;
2430
2431         bt->que[1] = set->page->root;
2432         *bt->que = 1;
2433         cnt = 0;
2434
2435         do bt_copykeys (set->page, bt->que[++cnt], bt->frame, bt->que, 0);
2436         while( cnt < *bt->que );
2437
2438         bt_putid(set->page->right, right->page_no);
2439
2440         //      translate old r/b root as new left fence
2441
2442         key = keyptr(bt->frame,bt->frame->root);
2443         bt_findslot (set->page, set->page, key->key, key->len, path, 0);
2444         bt_xfrslot (set->page, bt->frame, bt->frame->root, path, 1);
2445
2446         node = slotptr(set->page,set->page->cnt);
2447         memcpy(fencekey, key, key->len + 1);
2448         node->fence = 1;
2449
2450         // if current page is the root page, split it
2451
2452         if( set->page_no == ROOT_page )
2453                 return bt_splitroot (bt, set, fencekey, right->page_no);
2454
2455         // insert new fences in their parent pages
2456
2457         right->latch = bt_pinlatch (bt, right->page_no);
2458         bt_lockpage (BtLockParent, right->latch);
2459
2460         bt_lockpage (BtLockParent, set->latch);
2461         bt_unlockpage (BtLockWrite, set->latch);
2462
2463         // insert new fence for reformulated left block of smaller keys
2464
2465         bt_putid (value, set->page_no);
2466
2467         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId, 0) )
2468                 return bt->err;
2469
2470         // switch fence for right block of larger keys to new right page
2471
2472         bt_putid (value, right->page_no);
2473
2474         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId, stopper) )
2475                 return bt->err;
2476
2477         bt_unlockpage (BtLockParent, set->latch);
2478         bt_unpinlatch (set->latch);
2479         bt_unpinpool (set->pool);
2480
2481         bt_unlockpage (BtLockParent, right->latch);
2482         bt_unpinlatch (right->latch);
2483         return 0;
2484 }
2485 //  Insert new key into the btree at given level.
2486
2487 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint stopper)
2488 {
2489 BtPathStk path[1];
2490 BtPageSet set[1];
2491 uint slot, idx;
2492 BtSlot *node;
2493 uint reuse;
2494 BtKey ptr;
2495 BtVal val;
2496
2497         while( 1 ) {
2498                 if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite, path, stopper) )
2499                         node = slotptr(set->page, slot);
2500                 else {
2501                         if( !bt->err )
2502                                 bt->err = BTERR_ovflw;
2503                         return bt->err;
2504                 }
2505
2506                 // if key already exists, update id and return
2507
2508                 if( reuse = !path->entry[path->lvl].cmp )
2509                   if( val = valptr(set->page, slot), val->len >= vallen ) {
2510                         if( node->dead )
2511                                 set->page->act++;
2512                         node->dead = 0;
2513                         val->len = vallen;
2514                         memcpy (val->value, value, vallen);
2515                         bt_unlockpage(BtLockWrite, set->latch);
2516                         bt_unpinlatch (set->latch);
2517                         bt_unpinpool (set->pool);
2518                         return 0;
2519                   } else {
2520                         if( !node->dead )
2521                                 set->page->act--;
2522                         set->page->dirty = 1;
2523                         node->dead = 1;
2524                   }
2525
2526                 // check if page has enough space
2527
2528                 if( set->page->min >= (set->page->cnt+1) * sizeof(BtSlot) + sizeof(*set->page) + keylen + 1 + vallen + 1)
2529                         break;
2530
2531                 if( bt_cleanpage (bt, set->page, keylen, vallen) ) {
2532                         bt_unlockpage (BtLockWrite, set->latch);
2533                         bt_unpinlatch (set->latch);
2534                         bt_unpinpool (set->pool);
2535                         continue;       // find new slot number
2536                 }
2537
2538                 if( bt_splitpage (bt, set) )
2539                         return bt->err;
2540         }
2541
2542         // calculate next available slot and copy key into page
2543
2544         set->page->min -= vallen + 1; // reset lowest used offset
2545         ((unsigned char *)set->page)[set->page->min] = vallen;
2546         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2547
2548         set->page->min -= keylen + 1; // reset lowest used offset
2549         ((unsigned char *)set->page)[set->page->min] = keylen;
2550         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2551
2552         set->page->act++;
2553
2554         if( !reuse ) {
2555                 slot = ++set->page->cnt;
2556                 node = slotptr(set->page,slot);
2557         }
2558
2559         node->off = set->page->min;
2560         node->dead = 0;
2561
2562         if( !reuse )
2563                 bt_rbinsert (set->page, slot, path);
2564
2565         bt_unlockpage (BtLockWrite, set->latch);
2566         bt_unpinlatch (set->latch);
2567         bt_unpinpool (set->pool);
2568         return 0;
2569 }
2570
2571 BTERR bt_startpage (BtDb *bt, uid page_no)
2572 {
2573 BtPageSet set[1];
2574 BtSlot *node;
2575 uint slot;
2576
2577         if( set->pool = bt_pinpool (bt, page_no) )
2578                 set->page = bt_page (bt, set->pool, page_no);
2579         else
2580                 return 0;
2581
2582         set->latch = bt_pinlatch (bt, page_no);
2583     bt_lockpage(BtLockRead, set->latch);
2584
2585         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2586
2587         bt_unlockpage(BtLockRead, set->latch);
2588         bt_unpinlatch (set->latch);
2589         bt_unpinpool (set->pool);
2590
2591         slot = bt->cursor->root;
2592         bt->path->lvl = -1;
2593
2594         do {
2595                 bt->path->entry[++bt->path->lvl].slot = slot;
2596                 bt->path->entry[bt->path->lvl].cmp = 1;
2597                 slot = slotptr(bt->cursor, slot)->left;
2598         } while( slot && bt->path->lvl < BT_maxbits );
2599
2600         slot =  bt->path->entry[bt->path->lvl].slot;
2601
2602         while( slot && slotptr(bt->cursor,slot)->dead )
2603                 slot = bt_nextkey (bt);
2604
2605         return slot;
2606 }
2607
2608 //  cache page of keys into cursor and return starting slot for given key
2609
2610 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2611 {
2612 BtPageSet set[1];
2613 uint slot;
2614
2615         // cache page for retrieval
2616
2617         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead, bt->path, 0) )
2618           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2619         else
2620           return 0;
2621
2622         bt_unlockpage(BtLockRead, set->latch);
2623         bt_unpinlatch (set->latch);
2624         bt_unpinpool (set->pool);
2625
2626         while( bt->path->lvl && bt->path->entry[bt->path->lvl - 1].cmp < 0 )
2627           slot = bt->path->entry[--bt->path->lvl].slot;
2628
2629         return slot;
2630 }
2631
2632 //  return next slot for cursor page
2633 //  or slide cursor right into next page
2634
2635 uint bt_nextkey (BtDb *bt)
2636 {
2637 uid right;
2638 uint slot;
2639
2640   while( slot = bt_nextslot (bt->cursor, bt->path) )
2641         if( slotptr(bt->cursor,slot)->dead )
2642           continue;
2643         else
2644           return slot;
2645
2646   if( right = bt_getid(bt->cursor->right) )
2647         return bt_startpage (bt, right);
2648
2649   return bt->err = 0;
2650 }
2651
2652 BtKey bt_key(BtDb *bt, uint slot)
2653 {
2654         return keyptr(bt->cursor, slot);
2655 }
2656
2657 BtVal bt_val(BtDb *bt, uint slot)
2658 {
2659         return valptr(bt->cursor,slot);
2660 }
2661
2662 #ifdef STANDALONE
2663
2664 #ifndef unix
2665 double getCpuTime(int type)
2666 {
2667 FILETIME crtime[1];
2668 FILETIME xittime[1];
2669 FILETIME systime[1];
2670 FILETIME usrtime[1];
2671 SYSTEMTIME timeconv[1];
2672 double ans = 0;
2673
2674         memset (timeconv, 0, sizeof(SYSTEMTIME));
2675
2676         switch( type ) {
2677         case 0:
2678                 GetSystemTimeAsFileTime (xittime);
2679                 FileTimeToSystemTime (xittime, timeconv);
2680                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2681                 break;
2682         case 1:
2683                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2684                 FileTimeToSystemTime (usrtime, timeconv);
2685                 break;
2686         case 2:
2687                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2688                 FileTimeToSystemTime (systime, timeconv);
2689                 break;
2690         }
2691
2692         ans += (double)timeconv->wHour * 3600;
2693         ans += (double)timeconv->wMinute * 60;
2694         ans += (double)timeconv->wSecond;
2695         ans += (double)timeconv->wMilliseconds / 1000;
2696         return ans;
2697 }
2698 #else
2699 #include <time.h>
2700 #include <sys/resource.h>
2701
2702 double getCpuTime(int type)
2703 {
2704 struct rusage used[1];
2705 struct timeval tv[1];
2706
2707         switch( type ) {
2708         case 0:
2709                 gettimeofday(tv, NULL);
2710                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2711
2712         case 1:
2713                 getrusage(RUSAGE_SELF, used);
2714                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2715
2716         case 2:
2717                 getrusage(RUSAGE_SELF, used);
2718                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2719         }
2720
2721         return 0;
2722 }
2723 #endif
2724
2725 uint bt_latchaudit (BtDb *bt)
2726 {
2727 ushort idx, hashidx;
2728 uid next, page_no;
2729 BtLatchSet *latch;
2730 uint cnt = 0;
2731 BtKey ptr;
2732
2733 #ifdef unix
2734         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2735 #endif
2736         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2737                 fprintf(stderr, "Alloc page locked\n");
2738         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2739
2740         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2741                 latch = bt->mgr->latchsets + idx;
2742                 if( *latch->readwr->rin & MASK )
2743                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2744                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2745
2746                 if( *latch->access->rin & MASK )
2747                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2748                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2749
2750                 if( *latch->parent->rin & MASK )
2751                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2752                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2753
2754                 if( latch->pin ) {
2755                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2756                         latch->pin = 0;
2757                 }
2758         }
2759
2760         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2761           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2762                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2763
2764           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2765
2766           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2767                 latch = bt->mgr->latchsets + idx;
2768                 if( *(ushort *)latch->busy )
2769                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2770                 *(ushort *)latch->busy = 0;
2771                 if( latch->pin )
2772                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2773           } while( idx = latch->next );
2774         }
2775
2776         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2777         page_no = LEAF_page;
2778
2779         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2780         off64_t off = page_no << bt->mgr->page_bits;
2781 #ifdef unix
2782                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2783 #else
2784                 DWORD amt[1];
2785
2786                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2787
2788                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2789                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2790
2791                   if( *amt <  bt->mgr->page_size )
2792                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2793 #endif
2794                 if( !bt->frame->free )
2795                  if( !bt->frame->lvl )
2796                         cnt += bt->frame->act;
2797
2798                 if( page_no > LEAF_page )
2799                         next = page_no + 1;
2800                 page_no = next;
2801         }
2802         return cnt - 1;
2803 }
2804
2805 typedef struct {
2806         char idx;
2807         char *type;
2808         char *infile;
2809         BtMgr *mgr;
2810         int num;
2811 } ThreadArg;
2812
2813 //  standalone program to index file of keys
2814 //  then list them onto std-out
2815
2816 #ifdef unix
2817 void *index_file (void *arg)
2818 #else
2819 uint __stdcall index_file (void *arg)
2820 #endif
2821 {
2822 int line = 0, found = 0, cnt = 0;
2823 uid next, page_no = LEAF_page;  // start on first page of leaves
2824 unsigned char key[256];
2825 ThreadArg *args = arg;
2826 int ch, len = 0, slot;
2827 BtKey ptr;
2828 BtDb *bt;
2829 FILE *in;
2830
2831         bt = bt_open (args->mgr);
2832
2833         switch(args->type[0] | 0x20)
2834         {
2835         case 'a':
2836                 fprintf(stderr, "started latch mgr audit\n");
2837                 cnt = bt_latchaudit (bt);
2838                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2839                 break;
2840
2841         case 'p':
2842                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2843                 if( in = fopen (args->infile, "rb") )
2844                   while( ch = getc(in), ch != EOF )
2845                         if( ch == '\n' )
2846                         {
2847                           line++;
2848
2849                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 0) )
2850                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2851                           len = 0;
2852                         }
2853                         else if( len < 255 )
2854                                 key[len++] = ch;
2855                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2856                 break;
2857
2858         case 'w':
2859                 fprintf(stderr, "started indexing for %s\n", args->infile);
2860                 if( in = fopen (args->infile, "rb") )
2861                   while( ch = getc(in), ch != EOF )
2862                         if( ch == '\n' )
2863                         {
2864                           line++;
2865
2866                           if( args->num == 1 )
2867                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2868
2869                           else if( args->num )
2870                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2871
2872                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 0) )
2873                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2874                           len = 0;
2875                         }
2876                         else if( len < 255 )
2877                                 key[len++] = ch;
2878                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2879                 break;
2880
2881         case 'd':
2882                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2883                 if( in = fopen (args->infile, "rb") )
2884                   while( ch = getc(in), ch != EOF )
2885                         if( ch == '\n' )
2886                         {
2887                           line++;
2888                           if( args->num == 1 )
2889                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2890
2891                           else if( args->num )
2892                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2893
2894                           if( bt_deletekey (bt, key, len, 0, 0) )
2895                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2896                           len = 0;
2897                         }
2898                         else if( len < 255 )
2899                                 key[len++] = ch;
2900                 fprintf(stderr, "finished %s for keys, %d \n", args->infile, line);
2901                 break;
2902
2903         case 'f':
2904                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2905                 if( in = fopen (args->infile, "rb") )
2906                   while( ch = getc(in), ch != EOF )
2907                         if( ch == '\n' )
2908                         {
2909                           line++;
2910                           if( args->num == 1 )
2911                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2912
2913                           else if( args->num )
2914                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2915
2916                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2917                                 found++;
2918                           else if( bt->err )
2919                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2920                           len = 0;
2921                         }
2922                         else if( len < 255 )
2923                                 key[len++] = ch;
2924                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2925                 break;
2926
2927         case 's':
2928                 fprintf(stderr, "started scanning\n");
2929
2930                 if( slot = bt_startpage (bt, LEAF_page) )
2931                   do {
2932                         ptr = keyptr(bt->cursor, slot);
2933                         fwrite (ptr->key, ptr->len, 1, stdout);
2934                         fputc ('\n', stdout);
2935                         cnt++;
2936                   } while( slot = bt_nextkey (bt) );
2937
2938                 fprintf(stderr, " Total keys read %d\n", cnt);
2939                 break;
2940
2941         case 'c':
2942 #ifdef unix
2943                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2944 #endif
2945                 fprintf(stderr, "started counting\n");
2946                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2947                 page_no = LEAF_page;
2948
2949                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2950                 uid off = page_no << bt->mgr->page_bits;
2951 #ifdef unix
2952                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2953 #else
2954                 DWORD amt[1];
2955
2956                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2957
2958                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2959                         return bt->err = BTERR_map;
2960
2961                   if( *amt <  bt->mgr->page_size )
2962                         return bt->err = BTERR_map;
2963 #endif
2964                         if( !bt->frame->free && !bt->frame->lvl )
2965                                 cnt += bt->frame->act;
2966                         if( page_no > LEAF_page )
2967                                 next = page_no + 1;
2968                         page_no = next;
2969                 }
2970                 
2971                 fprintf(stderr, " Total keys read %d\n", cnt);
2972                 break;
2973         }
2974
2975         bt_close (bt);
2976 #ifdef unix
2977         return NULL;
2978 #else
2979         return 0;
2980 #endif
2981 }
2982
2983 typedef struct timeval timer;
2984
2985 int main (int argc, char **argv)
2986 {
2987 int idx, cnt, len, slot, err;
2988 int segsize, bits = 16;
2989 double start, stop;
2990 #ifdef unix
2991 pthread_t *threads;
2992 #else
2993 HANDLE *threads;
2994 #endif
2995 ThreadArg *args;
2996 uint poolsize = 0;
2997 float elapsed;
2998 int num = 0;
2999 char key[1];
3000 BtMgr *mgr;
3001 BtKey ptr;
3002 BtDb *bt;
3003
3004         if( argc < 3 ) {
3005                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
3006                 fprintf (stderr, "  where page_bits is the page size in bits\n");
3007                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
3008                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
3009                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
3010                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3011                 exit(0);
3012         }
3013
3014         start = getCpuTime(0);
3015
3016         if( argc > 3 )
3017                 bits = atoi(argv[3]);
3018
3019         if( argc > 4 )
3020                 poolsize = atoi(argv[4]);
3021
3022         if( !poolsize )
3023                 fprintf (stderr, "Warning: no mapped_pool\n");
3024
3025         if( poolsize > 65535 )
3026                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
3027
3028         if( argc > 5 )
3029                 segsize = atoi(argv[5]);
3030         else
3031                 segsize = 4;    // 16 pages per mmap segment
3032
3033         if( argc > 6 )
3034                 num = atoi(argv[6]);
3035
3036         cnt = argc - 7;
3037 #ifdef unix
3038         threads = malloc (cnt * sizeof(pthread_t));
3039 #else
3040         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3041 #endif
3042         args = malloc (cnt * sizeof(ThreadArg));
3043
3044         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
3045
3046         if( !mgr ) {
3047                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3048                 exit (1);
3049         }
3050
3051         //      fire off threads
3052
3053         for( idx = 0; idx < cnt; idx++ ) {
3054                 args[idx].infile = argv[idx + 7];
3055                 args[idx].type = argv[2];
3056                 args[idx].mgr = mgr;
3057                 args[idx].num = num;
3058                 args[idx].idx = idx;
3059 #ifdef unix
3060                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3061                         fprintf(stderr, "Error creating thread %d\n", err);
3062 #else
3063                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3064 #endif
3065         }
3066
3067         //      wait for termination
3068
3069 #ifdef unix
3070         for( idx = 0; idx < cnt; idx++ )
3071                 pthread_join (threads[idx], NULL);
3072 #else
3073         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3074
3075         for( idx = 0; idx < cnt; idx++ )
3076                 CloseHandle(threads[idx]);
3077
3078 #endif
3079         elapsed = getCpuTime(0) - start;
3080         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3081         elapsed = getCpuTime(1);
3082         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3083         elapsed = getCpuTime(2);
3084         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3085
3086         bt_mgrclose (mgr);
3087 }
3088
3089 #endif  //STANDALONE