]> pd.if.org Git - btree/blob - threadskv3.c
Add finalized threadskv10g LSM btree.
[btree] / threadskv3.c
1 // btree version threadskv3 sched_yield version
2 //      with reworked bt_deletekey code
3 //      and phase-fair reader writer lock
4 //      and librarian page split code
5 // 08 SEP 2014
6
7 // author: karl malbrain, malbrain@cal.berkeley.edu
8
9 /*
10 This work, including the source code, documentation
11 and related data, is placed into the public domain.
12
13 The orginal author is Karl Malbrain.
14
15 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
16 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
17 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
18 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
19 RESULTING FROM THE USE, MODIFICATION, OR
20 REDISTRIBUTION OF THIS SOFTWARE.
21 */
22
23 // Please see the project home page for documentation
24 // code.google.com/p/high-concurrency-btree
25
26 #define _FILE_OFFSET_BITS 64
27 #define _LARGEFILE64_SOURCE
28
29 #ifdef linux
30 #define _GNU_SOURCE
31 #endif
32
33 #ifdef unix
34 #include <unistd.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <fcntl.h>
38 #include <sys/time.h>
39 #include <sys/mman.h>
40 #include <errno.h>
41 #include <pthread.h>
42 #else
43 #define WIN32_LEAN_AND_MEAN
44 #include <windows.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <time.h>
48 #include <fcntl.h>
49 #include <process.h>
50 #include <intrin.h>
51 #endif
52
53 #include <memory.h>
54 #include <string.h>
55 #include <stddef.h>
56
57 typedef unsigned long long      uid;
58
59 #ifndef unix
60 typedef unsigned long long      off64_t;
61 typedef unsigned short          ushort;
62 typedef unsigned int            uint;
63 #endif
64
65 #define BT_latchtable   128                                     // number of latch manager slots
66
67 #define BT_ro 0x6f72    // ro
68 #define BT_rw 0x7772    // rw
69
70 #define BT_maxbits              24                                      // maximum page size in bits
71 #define BT_minbits              9                                       // minimum page size in bits
72 #define BT_minpage              (1 << BT_minbits)       // minimum page size
73 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
74
75 /*
76 There are five lock types for each node in three independent sets: 
77 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
78 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
79 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
80 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
81 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
82 */
83
84 typedef enum{
85         BtLockAccess,
86         BtLockDelete,
87         BtLockRead,
88         BtLockWrite,
89         BtLockParent
90 } BtLock;
91
92 //      definition for phase-fair reader/writer lock implementation
93
94 typedef struct {
95         ushort rin[1];
96         ushort rout[1];
97         ushort ticket[1];
98         ushort serving[1];
99 } RWLock;
100
101 #define PHID 0x1
102 #define PRES 0x2
103 #define MASK 0x3
104 #define RINC 0x4
105
106 //      definition for spin latch implementation
107
108 // exclusive is set for write access
109 // share is count of read accessors
110 // grant write lock when share == 0
111
112 volatile typedef struct {
113         ushort exclusive:1;
114         ushort pending:1;
115         ushort share:14;
116 } BtSpinLatch;
117
118 #define XCL 1
119 #define PEND 2
120 #define BOTH 3
121 #define SHARE 4
122
123 //  hash table entries
124
125 typedef struct {
126         BtSpinLatch latch[1];
127         volatile ushort slot;           // Latch table entry at head of chain
128 } BtHashEntry;
129
130 //      latch manager table structure
131
132 typedef struct {
133         RWLock readwr[1];               // read/write page lock
134         RWLock access[1];               // Access Intent/Page delete
135         RWLock parent[1];               // Posting of fence key in parent
136         BtSpinLatch busy[1];            // slot is being moved between chains
137         volatile ushort next;           // next entry in hash table chain
138         volatile ushort prev;           // prev entry in hash table chain
139         volatile ushort pin;            // number of outstanding locks
140         volatile ushort hash;           // hash slot entry is under
141         volatile uid page_no;           // latch set page number
142 } BtLatchSet;
143
144 //      Define the length of the page and key pointers
145
146 #define BtId 6
147
148 //      Page key slot definition.
149
150 //      Keys are marked dead, but remain on the page until
151 //      it cleanup is called. The fence key (highest key) for
152 //      the page is always present, even after cleanup.
153
154 typedef struct {
155         uint off:BT_maxbits;            // page offset for key start
156         uint librarian:1;                       // set for librarian slot
157         uint dead:1;                            // set for deleted key
158 } BtSlot;
159
160 //      The key structure occupies space at the upper end of
161 //      each page.  It's a length byte followed by the key
162 //      bytes.
163
164 typedef struct {
165         unsigned char len;
166         unsigned char key[1];
167 } *BtKey;
168
169 //      the value structure also occupies space at the upper
170 //      end of the page.
171
172 typedef struct {
173         unsigned char len;
174         unsigned char value[1];
175 } *BtVal;
176
177 //      The first part of an index page.
178 //      It is immediately followed
179 //      by the BtSlot array of keys.
180
181 typedef struct BtPage_ {
182         uint cnt;                                       // count of keys in page
183         uint act;                                       // count of active keys
184         uint min;                                       // next key offset
185         uint garbage;                           // page garbage in bytes
186         unsigned char bits:7;           // page size in bits
187         unsigned char free:1;           // page is on free chain
188         unsigned char lvl:7;            // level of page
189         unsigned char kill:1;           // page is being deleted
190         unsigned char right[BtId];      // page number to right
191 } *BtPage;
192
193 //      The memory mapping pool table buffer manager entry
194
195 typedef struct {
196         uid  basepage;                          // mapped base page number
197         char *map;                                      // mapped memory pointer
198         ushort slot;                            // slot index in this array
199         ushort pin;                                     // mapped page pin counter
200         void *hashprev;                         // previous pool entry for the same hash idx
201         void *hashnext;                         // next pool entry for the same hash idx
202 #ifndef unix
203         HANDLE hmap;                            // Windows memory mapping handle
204 #endif
205 } BtPool;
206
207 #define CLOCK_bit 0x8000                // bit in pool->pin
208
209 //  The loadpage interface object
210
211 typedef struct {
212         uid page_no;            // current page number
213         BtPage page;            // current page pointer
214         BtPool *pool;           // current page pool
215         BtLatchSet *latch;      // current page latch set
216 } BtPageSet;
217
218 //      structure for latch manager on ALLOC_page
219
220 typedef struct {
221         struct BtPage_ alloc[1];        // next page_no in right ptr
222         unsigned char chain[BtId];      // head of free page_nos chain
223         BtSpinLatch lock[1];            // allocation area lite latch
224         ushort latchdeployed;           // highest number of latch entries deployed
225         ushort nlatchpage;                      // number of latch pages at BT_latch
226         ushort latchtotal;                      // number of page latch entries
227         ushort latchhash;                       // number of latch hash table slots
228         ushort latchvictim;                     // next latch entry to examine
229         BtHashEntry table[0];           // the hash table
230 } BtLatchMgr;
231
232 //      The object structure for Btree access
233
234 typedef struct {
235         uint page_size;                         // page size    
236         uint page_bits;                         // page size in bits    
237         uint seg_bits;                          // seg size in pages in bits
238         uint mode;                                      // read-write mode
239 #ifdef unix
240         int idx;
241 #else
242         HANDLE idx;
243 #endif
244         ushort poolcnt;                         // highest page pool node in use
245         ushort poolmax;                         // highest page pool node allocated
246         ushort poolmask;                        // total number of pages in mmap segment - 1
247         ushort hashsize;                        // size of Hash Table for pool entries
248         volatile uint evicted;          // last evicted hash table slot
249         ushort *hash;                           // pool index for hash entries
250         BtSpinLatch *latch;                     // latches for hash table slots
251         BtLatchMgr *latchmgr;           // mapped latch page from allocation page
252         BtLatchSet *latchsets;          // mapped latch set from latch pages
253         BtPool *pool;                           // memory pool page segments
254 #ifndef unix
255         HANDLE halloc;                          // allocation and latch table handle
256 #endif
257 } BtMgr;
258
259 typedef struct {
260         BtMgr *mgr;                     // buffer manager for thread
261         BtPage cursor;          // cached frame for start/next (never mapped)
262         BtPage frame;           // spare frame for the page split (never mapped)
263         uid cursor_page;        // current cursor page number   
264         unsigned char *mem;     // frame, cursor, page memory buffer
265         int found;                      // last delete or insert was found
266         int err;                        // last error
267 } BtDb;
268
269 typedef enum {
270         BTERR_ok = 0,
271         BTERR_struct,
272         BTERR_ovflw,
273         BTERR_lock,
274         BTERR_map,
275         BTERR_wrt,
276         BTERR_hash
277 } BTERR;
278
279 // B-Tree functions
280 extern void bt_close (BtDb *bt);
281 extern BtDb *bt_open (BtMgr *mgr);
282 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen);
283 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
284 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint vallen);
285 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
286 extern uint bt_nextkey   (BtDb *bt, uint slot);
287
288 //      manager functions
289 extern BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolsize, uint segsize, uint hashsize);
290 void bt_mgrclose (BtMgr *mgr);
291
292 //  Helper functions to return slot values
293 //      from the cursor page.
294
295 extern BtKey bt_key (BtDb *bt, uint slot);
296 extern BtVal bt_val (BtDb *bt, uint slot);
297
298 //  BTree page number constants
299 #define ALLOC_page              0       // allocation & latch manager hash table
300 #define ROOT_page               1       // root of the btree
301 #define LEAF_page               2       // first page of leaves
302 #define LATCH_page              3       // pages for latch manager
303
304 //      Number of levels to create in a new BTree
305
306 #define MIN_lvl                 2
307
308 //  The page is allocated from low and hi ends.
309 //  The key slots are allocated from the bottom,
310 //      while the text and value of the key
311 //  are allocated from the top.  When the two
312 //  areas meet, the page is split into two.
313
314 //  A key consists of a length byte, two bytes of
315 //  index number (0 - 65534), and up to 253 bytes
316 //  of key value.  Duplicate keys are discarded.
317 //  Associated with each key is a value byte string
318 //      containing any value desired.
319
320 //  The b-tree root is always located at page 1.
321 //      The first leaf page of level zero is always
322 //      located on page 2.
323
324 //      The b-tree pages are linked with next
325 //      pointers to facilitate enumerators,
326 //      and provide for concurrency.
327
328 //      When to root page fills, it is split in two and
329 //      the tree height is raised by a new root at page
330 //      one with two keys.
331
332 //      Deleted keys are marked with a dead bit until
333 //      page cleanup. The fence key for a leaf node is
334 //      always present
335
336 //  Groups of pages called segments from the btree are optionally
337 //  cached with a memory mapped pool. A hash table is used to keep
338 //  track of the cached segments.  This behaviour is controlled
339 //  by the cache block size parameter to bt_open.
340
341 //  To achieve maximum concurrency one page is locked at a time
342 //  as the tree is traversed to find leaf key in question. The right
343 //  page numbers are used in cases where the page is being split,
344 //      or consolidated.
345
346 //  Page 0 is dedicated to lock for new page extensions,
347 //      and chains empty pages together for reuse. It also
348 //      contains the latch manager hash table.
349
350 //      The ParentModification lock on a node is obtained to serialize posting
351 //      or changing the fence key for a node.
352
353 //      Empty pages are chained together through the ALLOC page and reused.
354
355 //      Access macros to address slot and key values from the page
356 //      Page slots use 1 based indexing.
357
358 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
359 #define keyptr(page, slot) ((BtKey)((unsigned char*)(page) + slotptr(page, slot)->off))
360 #define valptr(page, slot) ((BtVal)(keyptr(page,slot)->key + keyptr(page,slot)->len))
361
362 void bt_putid(unsigned char *dest, uid id)
363 {
364 int i = BtId;
365
366         while( i-- )
367                 dest[i] = (unsigned char)id, id >>= 8;
368 }
369
370 uid bt_getid(unsigned char *src)
371 {
372 uid id = 0;
373 int i;
374
375         for( i = 0; i < BtId; i++ )
376                 id <<= 8, id |= *src++; 
377
378         return id;
379 }
380
381 //      Phase-Fair reader/writer lock implementation
382
383 void WriteLock (RWLock *lock)
384 {
385 ushort w, r, tix;
386
387 #ifdef unix
388         tix = __sync_fetch_and_add (lock->ticket, 1);
389 #else
390         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
391 #endif
392         // wait for our ticket to come up
393
394         while( tix != lock->serving[0] )
395 #ifdef unix
396                 sched_yield();
397 #else
398                 SwitchToThread ();
399 #endif
400
401         w = PRES | (tix & PHID);
402 #ifdef  unix
403         r = __sync_fetch_and_add (lock->rin, w);
404 #else
405         r = _InterlockedExchangeAdd16 (lock->rin, w);
406 #endif
407         while( r != *lock->rout )
408 #ifdef unix
409                 sched_yield();
410 #else
411                 SwitchToThread();
412 #endif
413 }
414
415 void WriteRelease (RWLock *lock)
416 {
417 #ifdef unix
418         __sync_fetch_and_and (lock->rin, ~MASK);
419 #else
420         _InterlockedAnd16 (lock->rin, ~MASK);
421 #endif
422         lock->serving[0]++;
423 }
424
425 void ReadLock (RWLock *lock)
426 {
427 ushort w;
428 #ifdef unix
429         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
430 #else
431         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
432 #endif
433         if( w )
434           while( w == (*lock->rin & MASK) )
435 #ifdef unix
436                 sched_yield ();
437 #else
438                 SwitchToThread ();
439 #endif
440 }
441
442 void ReadRelease (RWLock *lock)
443 {
444 #ifdef unix
445         __sync_fetch_and_add (lock->rout, RINC);
446 #else
447         _InterlockedExchangeAdd16 (lock->rout, RINC);
448 #endif
449 }
450
451 //      Spin Latch Manager
452
453 //      wait until write lock mode is clear
454 //      and add 1 to the share count
455
456 void bt_spinreadlock(BtSpinLatch *latch)
457 {
458 ushort prev;
459
460   do {
461 #ifdef unix
462         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
463 #else
464         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
465 #endif
466         //  see if exclusive request is granted or pending
467
468         if( !(prev & BOTH) )
469                 return;
470 #ifdef unix
471         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
472 #else
473         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
474 #endif
475 #ifdef  unix
476   } while( sched_yield(), 1 );
477 #else
478   } while( SwitchToThread(), 1 );
479 #endif
480 }
481
482 //      wait for other read and write latches to relinquish
483
484 void bt_spinwritelock(BtSpinLatch *latch)
485 {
486 ushort prev;
487
488   do {
489 #ifdef  unix
490         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
491 #else
492         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
493 #endif
494         if( !(prev & XCL) )
495           if( !(prev & ~BOTH) )
496                 return;
497           else
498 #ifdef unix
499                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
500 #else
501                 _InterlockedAnd16((ushort *)latch, ~XCL);
502 #endif
503 #ifdef  unix
504   } while( sched_yield(), 1 );
505 #else
506   } while( SwitchToThread(), 1 );
507 #endif
508 }
509
510 //      try to obtain write lock
511
512 //      return 1 if obtained,
513 //              0 otherwise
514
515 int bt_spinwritetry(BtSpinLatch *latch)
516 {
517 ushort prev;
518
519 #ifdef  unix
520         prev = __sync_fetch_and_or((ushort *)latch, XCL);
521 #else
522         prev = _InterlockedOr16((ushort *)latch, XCL);
523 #endif
524         //      take write access if all bits are clear
525
526         if( !(prev & XCL) )
527           if( !(prev & ~BOTH) )
528                 return 1;
529           else
530 #ifdef unix
531                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
532 #else
533                 _InterlockedAnd16((ushort *)latch, ~XCL);
534 #endif
535         return 0;
536 }
537
538 //      clear write mode
539
540 void bt_spinreleasewrite(BtSpinLatch *latch)
541 {
542 #ifdef unix
543         __sync_fetch_and_and((ushort *)latch, ~BOTH);
544 #else
545         _InterlockedAnd16((ushort *)latch, ~BOTH);
546 #endif
547 }
548
549 //      decrement reader count
550
551 void bt_spinreleaseread(BtSpinLatch *latch)
552 {
553 #ifdef unix
554         __sync_fetch_and_add((ushort *)latch, -SHARE);
555 #else
556         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
557 #endif
558 }
559
560 //      link latch table entry into latch hash table
561
562 void bt_latchlink (BtDb *bt, ushort hashidx, ushort victim, uid page_no)
563 {
564 BtLatchSet *set = bt->mgr->latchsets + victim;
565
566         if( set->next = bt->mgr->latchmgr->table[hashidx].slot )
567                 bt->mgr->latchsets[set->next].prev = victim;
568
569         bt->mgr->latchmgr->table[hashidx].slot = victim;
570         set->page_no = page_no;
571         set->hash = hashidx;
572         set->prev = 0;
573 }
574
575 //      release latch pin
576
577 void bt_unpinlatch (BtLatchSet *set)
578 {
579 #ifdef unix
580         __sync_fetch_and_add(&set->pin, -1);
581 #else
582         _InterlockedDecrement16 (&set->pin);
583 #endif
584 }
585
586 //      find existing latchset or inspire new one
587 //      return with latchset pinned
588
589 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no)
590 {
591 ushort hashidx = page_no % bt->mgr->latchmgr->latchhash;
592 ushort slot, avail = 0, victim, idx;
593 BtLatchSet *set;
594
595         //  obtain read lock on hash table entry
596
597         bt_spinreadlock(bt->mgr->latchmgr->table[hashidx].latch);
598
599         if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
600         {
601                 set = bt->mgr->latchsets + slot;
602                 if( page_no == set->page_no )
603                         break;
604         } while( slot = set->next );
605
606         if( slot ) {
607 #ifdef unix
608                 __sync_fetch_and_add(&set->pin, 1);
609 #else
610                 _InterlockedIncrement16 (&set->pin);
611 #endif
612         }
613
614     bt_spinreleaseread (bt->mgr->latchmgr->table[hashidx].latch);
615
616         if( slot )
617                 return set;
618
619   //  try again, this time with write lock
620
621   bt_spinwritelock(bt->mgr->latchmgr->table[hashidx].latch);
622
623   if( slot = bt->mgr->latchmgr->table[hashidx].slot ) do
624   {
625         set = bt->mgr->latchsets + slot;
626         if( page_no == set->page_no )
627                 break;
628         if( !set->pin && !avail )
629                 avail = slot;
630   } while( slot = set->next );
631
632   //  found our entry, or take over an unpinned one
633
634   if( slot || (slot = avail) ) {
635         set = bt->mgr->latchsets + slot;
636 #ifdef unix
637         __sync_fetch_and_add(&set->pin, 1);
638 #else
639         _InterlockedIncrement16 (&set->pin);
640 #endif
641         set->page_no = page_no;
642         bt_spinreleasewrite(bt->mgr->latchmgr->table[hashidx].latch);
643         return set;
644   }
645
646         //  see if there are any unused entries
647 #ifdef unix
648         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, 1) + 1;
649 #else
650         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchdeployed);
651 #endif
652
653         if( victim < bt->mgr->latchmgr->latchtotal ) {
654                 set = bt->mgr->latchsets + victim;
655 #ifdef unix
656                 __sync_fetch_and_add(&set->pin, 1);
657 #else
658                 _InterlockedIncrement16 (&set->pin);
659 #endif
660                 bt_latchlink (bt, hashidx, victim, page_no);
661                 bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
662                 return set;
663         }
664
665 #ifdef unix
666         victim = __sync_fetch_and_add (&bt->mgr->latchmgr->latchdeployed, -1);
667 #else
668         victim = _InterlockedDecrement16 (&bt->mgr->latchmgr->latchdeployed);
669 #endif
670   //  find and reuse previous lock entry
671
672   while( 1 ) {
673 #ifdef unix
674         victim = __sync_fetch_and_add(&bt->mgr->latchmgr->latchvictim, 1);
675 #else
676         victim = _InterlockedIncrement16 (&bt->mgr->latchmgr->latchvictim) - 1;
677 #endif
678         //      we don't use slot zero
679
680         if( victim %= bt->mgr->latchmgr->latchtotal )
681                 set = bt->mgr->latchsets + victim;
682         else
683                 continue;
684
685         //      take control of our slot
686         //      from other threads
687
688         if( set->pin || !bt_spinwritetry (set->busy) )
689                 continue;
690
691         idx = set->hash;
692
693         // try to get write lock on hash chain
694         //      skip entry if not obtained
695         //      or has outstanding locks
696
697         if( !bt_spinwritetry (bt->mgr->latchmgr->table[idx].latch) ) {
698                 bt_spinreleasewrite (set->busy);
699                 continue;
700         }
701
702         if( set->pin ) {
703                 bt_spinreleasewrite (set->busy);
704                 bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
705                 continue;
706         }
707
708         //  unlink our available victim from its hash chain
709
710         if( set->prev )
711                 bt->mgr->latchsets[set->prev].next = set->next;
712         else
713                 bt->mgr->latchmgr->table[idx].slot = set->next;
714
715         if( set->next )
716                 bt->mgr->latchsets[set->next].prev = set->prev;
717
718         bt_spinreleasewrite (bt->mgr->latchmgr->table[idx].latch);
719 #ifdef unix
720         __sync_fetch_and_add(&set->pin, 1);
721 #else
722         _InterlockedIncrement16 (&set->pin);
723 #endif
724         bt_latchlink (bt, hashidx, victim, page_no);
725         bt_spinreleasewrite (bt->mgr->latchmgr->table[hashidx].latch);
726         bt_spinreleasewrite (set->busy);
727         return set;
728   }
729 }
730
731 void bt_mgrclose (BtMgr *mgr)
732 {
733 BtPool *pool;
734 uint slot;
735
736         // release mapped pages
737         //      note that slot zero is never used
738
739         for( slot = 1; slot < mgr->poolmax; slot++ ) {
740                 pool = mgr->pool + slot;
741                 if( pool->slot )
742 #ifdef unix
743                         munmap (pool->map, (uid)(mgr->poolmask+1) << mgr->page_bits);
744 #else
745                 {
746                         FlushViewOfFile(pool->map, 0);
747                         UnmapViewOfFile(pool->map);
748                         CloseHandle(pool->hmap);
749                 }
750 #endif
751         }
752
753 #ifdef unix
754         munmap (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
755         munmap (mgr->latchmgr, 2 * mgr->page_size);
756 #else
757         FlushViewOfFile(mgr->latchmgr, 0);
758         UnmapViewOfFile(mgr->latchmgr);
759         CloseHandle(mgr->halloc);
760 #endif
761 #ifdef unix
762         close (mgr->idx);
763         free (mgr->pool);
764         free (mgr->hash);
765         free ((void *)mgr->latch);
766         free (mgr);
767 #else
768         FlushFileBuffers(mgr->idx);
769         CloseHandle(mgr->idx);
770         GlobalFree (mgr->pool);
771         GlobalFree (mgr->hash);
772         GlobalFree ((void *)mgr->latch);
773         GlobalFree (mgr);
774 #endif
775 }
776
777 //      close and release memory
778
779 void bt_close (BtDb *bt)
780 {
781 #ifdef unix
782         if( bt->mem )
783                 free (bt->mem);
784 #else
785         if( bt->mem)
786                 VirtualFree (bt->mem, 0, MEM_RELEASE);
787 #endif
788         free (bt);
789 }
790
791 //  open/create new btree buffer manager
792
793 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
794 //              size of mapped page pool (e.g. 8192)
795
796 BtMgr *bt_mgr (char *name, uint mode, uint bits, uint poolmax, uint segsize, uint hashsize)
797 {
798 uint lvl, attr, cacheblk, last, slot, idx;
799 uint nlatchpage, latchhash;
800 unsigned char value[BtId];
801 BtLatchMgr *latchmgr;
802 off64_t size;
803 uint amt[1];
804 BtMgr* mgr;
805 BtKey key;
806 BtVal val;
807 int flag;
808
809 #ifndef unix
810 SYSTEM_INFO sysinfo[1];
811 #endif
812
813         // determine sanity of page size and buffer pool
814
815         if( bits > BT_maxbits )
816                 bits = BT_maxbits;
817         else if( bits < BT_minbits )
818                 bits = BT_minbits;
819
820         if( !poolmax )
821                 return NULL;    // must have buffer pool
822
823 #ifdef unix
824         mgr = calloc (1, sizeof(BtMgr));
825
826         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
827
828         if( mgr->idx == -1 )
829                 return free(mgr), NULL;
830         
831         cacheblk = 4096;        // minimum mmap segment size for unix
832
833 #else
834         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
835         attr = FILE_ATTRIBUTE_NORMAL;
836         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
837
838         if( mgr->idx == INVALID_HANDLE_VALUE )
839                 return GlobalFree(mgr), NULL;
840
841         // normalize cacheblk to multiple of sysinfo->dwAllocationGranularity
842         GetSystemInfo(sysinfo);
843         cacheblk = sysinfo->dwAllocationGranularity;
844 #endif
845
846 #ifdef unix
847         latchmgr = malloc (BT_maxpage);
848         *amt = 0;
849
850         // read minimum page size to get root info
851
852         if( size = lseek (mgr->idx, 0L, 2) ) {
853                 if( pread(mgr->idx, latchmgr, BT_minpage, 0) == BT_minpage )
854                         bits = latchmgr->alloc->bits;
855                 else
856                         return free(mgr), free(latchmgr), NULL;
857         } else if( mode == BT_ro )
858                 return free(latchmgr), bt_mgrclose (mgr), NULL;
859 #else
860         latchmgr = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
861         size = GetFileSize(mgr->idx, amt);
862
863         if( size || *amt ) {
864                 if( !ReadFile(mgr->idx, (char *)latchmgr, BT_minpage, amt, NULL) )
865                         return bt_mgrclose (mgr), NULL;
866                 bits = latchmgr->alloc->bits;
867         } else if( mode == BT_ro )
868                 return bt_mgrclose (mgr), NULL;
869 #endif
870
871         mgr->page_size = 1 << bits;
872         mgr->page_bits = bits;
873
874         mgr->poolmax = poolmax;
875         mgr->mode = mode;
876
877         if( cacheblk < mgr->page_size )
878                 cacheblk = mgr->page_size;
879
880         //  mask for partial memmaps
881
882         mgr->poolmask = (cacheblk >> bits) - 1;
883
884         //      see if requested size of pages per memmap is greater
885
886         if( (1 << segsize) > mgr->poolmask )
887                 mgr->poolmask = (1 << segsize) - 1;
888
889         mgr->seg_bits = 0;
890
891         while( (1 << mgr->seg_bits) <= mgr->poolmask )
892                 mgr->seg_bits++;
893
894         mgr->hashsize = hashsize;
895
896 #ifdef unix
897         mgr->pool = calloc (poolmax, sizeof(BtPool));
898         mgr->hash = calloc (hashsize, sizeof(ushort));
899         mgr->latch = calloc (hashsize, sizeof(BtSpinLatch));
900 #else
901         mgr->pool = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, poolmax * sizeof(BtPool));
902         mgr->hash = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(ushort));
903         mgr->latch = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, hashsize * sizeof(BtSpinLatch));
904 #endif
905
906         if( size || *amt )
907                 goto mgrlatch;
908
909         // initialize an empty b-tree with latch page, root page, page of leaves
910         // and page(s) of latches
911
912         memset (latchmgr, 0, 1 << bits);
913         nlatchpage = BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1; 
914         bt_putid(latchmgr->alloc->right, MIN_lvl+1+nlatchpage);
915         latchmgr->alloc->bits = mgr->page_bits;
916
917         latchmgr->nlatchpage = nlatchpage;
918         latchmgr->latchtotal = nlatchpage * (mgr->page_size / sizeof(BtLatchSet));
919
920         //  initialize latch manager
921
922         latchhash = (mgr->page_size - sizeof(BtLatchMgr)) / sizeof(BtHashEntry);
923
924         //      size of hash table = total number of latchsets
925
926         if( latchhash > latchmgr->latchtotal )
927                 latchhash = latchmgr->latchtotal;
928
929         latchmgr->latchhash = latchhash;
930
931 #ifdef unix
932         if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
933                 return bt_mgrclose (mgr), NULL;
934 #else
935         if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
936                 return bt_mgrclose (mgr), NULL;
937
938         if( *amt < mgr->page_size )
939                 return bt_mgrclose (mgr), NULL;
940 #endif
941
942         memset (latchmgr, 0, 1 << bits);
943         latchmgr->alloc->bits = mgr->page_bits;
944
945         for( lvl=MIN_lvl; lvl--; ) {
946                 slotptr(latchmgr->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + 1: 1);
947                 key = keyptr(latchmgr->alloc, 1);
948                 key->len = 2;           // create stopper key
949                 key->key[0] = 0xff;
950                 key->key[1] = 0xff;
951
952                 bt_putid(value, MIN_lvl - lvl + 1);
953                 val = valptr(latchmgr->alloc, 1);
954                 val->len = lvl ? BtId : 0;
955                 memcpy (val->value, value, val->len);
956
957                 latchmgr->alloc->min = slotptr(latchmgr->alloc, 1)->off;
958                 latchmgr->alloc->lvl = lvl;
959                 latchmgr->alloc->cnt = 1;
960                 latchmgr->alloc->act = 1;
961 #ifdef unix
962                 if( write (mgr->idx, latchmgr, mgr->page_size) < mgr->page_size )
963                         return bt_mgrclose (mgr), NULL;
964 #else
965                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
966                         return bt_mgrclose (mgr), NULL;
967
968                 if( *amt < mgr->page_size )
969                         return bt_mgrclose (mgr), NULL;
970 #endif
971         }
972
973         // clear out latch manager locks
974         //      and rest of pages to round out segment
975
976         memset(latchmgr, 0, mgr->page_size);
977         last = MIN_lvl + 1;
978
979         while( last <= ((MIN_lvl + 1 + nlatchpage) | mgr->poolmask) ) {
980 #ifdef unix
981                 pwrite(mgr->idx, latchmgr, mgr->page_size, last << mgr->page_bits);
982 #else
983                 SetFilePointer (mgr->idx, last << mgr->page_bits, NULL, FILE_BEGIN);
984                 if( !WriteFile (mgr->idx, (char *)latchmgr, mgr->page_size, amt, NULL) )
985                         return bt_mgrclose (mgr), NULL;
986                 if( *amt < mgr->page_size )
987                         return bt_mgrclose (mgr), NULL;
988 #endif
989                 last++;
990         }
991
992 mgrlatch:
993 #ifdef unix
994         // mlock the root page and the latchmgr page
995
996         flag = PROT_READ | PROT_WRITE;
997         mgr->latchmgr = mmap (0, 2 * mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page * mgr->page_size);
998         if( mgr->latchmgr == MAP_FAILED )
999                 return bt_mgrclose (mgr), NULL;
1000         mlock (mgr->latchmgr, 2 * mgr->page_size);
1001
1002         mgr->latchsets = (BtLatchSet *)mmap (0, mgr->latchmgr->nlatchpage * mgr->page_size, flag, MAP_SHARED, mgr->idx, LATCH_page * mgr->page_size);
1003         if( mgr->latchsets == MAP_FAILED )
1004                 return bt_mgrclose (mgr), NULL;
1005         mlock (mgr->latchsets, mgr->latchmgr->nlatchpage * mgr->page_size);
1006 #else
1007         flag = PAGE_READWRITE;
1008         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size, NULL);
1009         if( !mgr->halloc )
1010                 return bt_mgrclose (mgr), NULL;
1011
1012         flag = FILE_MAP_WRITE;
1013         mgr->latchmgr = MapViewOfFile(mgr->halloc, flag, 0, 0, (BT_latchtable / (mgr->page_size / sizeof(BtLatchSet)) + 1 + LATCH_page) * mgr->page_size);
1014         if( !mgr->latchmgr )
1015                 return GetLastError(), bt_mgrclose (mgr), NULL;
1016
1017         mgr->latchsets = (void *)((char *)mgr->latchmgr + LATCH_page * mgr->page_size);
1018 #endif
1019
1020 #ifdef unix
1021         free (latchmgr);
1022 #else
1023         VirtualFree (latchmgr, 0, MEM_RELEASE);
1024 #endif
1025         return mgr;
1026 }
1027
1028 //      open BTree access method
1029 //      based on buffer manager
1030
1031 BtDb *bt_open (BtMgr *mgr)
1032 {
1033 BtDb *bt = malloc (sizeof(*bt));
1034
1035         memset (bt, 0, sizeof(*bt));
1036         bt->mgr = mgr;
1037 #ifdef unix
1038         bt->mem = malloc (2 *mgr->page_size);
1039 #else
1040         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1041 #endif
1042         bt->frame = (BtPage)bt->mem;
1043         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1044         return bt;
1045 }
1046
1047 //  compare two keys, returning > 0, = 0, or < 0
1048 //  as the comparison value
1049
1050 int keycmp (BtKey key1, unsigned char *key2, uint len2)
1051 {
1052 uint len1 = key1->len;
1053 int ans;
1054
1055         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1056                 return ans;
1057
1058         if( len1 > len2 )
1059                 return 1;
1060         if( len1 < len2 )
1061                 return -1;
1062
1063         return 0;
1064 }
1065
1066 //      Buffer Pool mgr
1067
1068 // find segment in pool
1069 // must be called with hashslot idx locked
1070 //      return NULL if not there
1071 //      otherwise return node
1072
1073 BtPool *bt_findpool(BtDb *bt, uid page_no, uint idx)
1074 {
1075 BtPool *pool;
1076 uint slot;
1077
1078         // compute start of hash chain in pool
1079
1080         if( slot = bt->mgr->hash[idx] ) 
1081                 pool = bt->mgr->pool + slot;
1082         else
1083                 return NULL;
1084
1085         page_no &= ~bt->mgr->poolmask;
1086
1087         while( pool->basepage != page_no )
1088           if( pool = pool->hashnext )
1089                 continue;
1090           else
1091                 return NULL;
1092
1093         return pool;
1094 }
1095
1096 // add segment to hash table
1097
1098 void bt_linkhash(BtDb *bt, BtPool *pool, uid page_no, int idx)
1099 {
1100 BtPool *node;
1101 uint slot;
1102
1103         pool->hashprev = pool->hashnext = NULL;
1104         pool->basepage = page_no & ~bt->mgr->poolmask;
1105         pool->pin = CLOCK_bit + 1;
1106
1107         if( slot = bt->mgr->hash[idx] ) {
1108                 node = bt->mgr->pool + slot;
1109                 pool->hashnext = node;
1110                 node->hashprev = pool;
1111         }
1112
1113         bt->mgr->hash[idx] = pool->slot;
1114 }
1115
1116 //  map new buffer pool segment to virtual memory
1117
1118 BTERR bt_mapsegment(BtDb *bt, BtPool *pool, uid page_no)
1119 {
1120 off64_t off = (page_no & ~bt->mgr->poolmask) << bt->mgr->page_bits;
1121 off64_t limit = off + ((bt->mgr->poolmask+1) << bt->mgr->page_bits);
1122 int flag;
1123
1124 #ifdef unix
1125         flag = PROT_READ | ( bt->mgr->mode == BT_ro ? 0 : PROT_WRITE );
1126         pool->map = mmap (0, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits, flag, MAP_SHARED, bt->mgr->idx, off);
1127         if( pool->map == MAP_FAILED )
1128                 return bt->err = BTERR_map;
1129
1130 #else
1131         flag = ( bt->mgr->mode == BT_ro ? PAGE_READONLY : PAGE_READWRITE );
1132         pool->hmap = CreateFileMapping(bt->mgr->idx, NULL, flag, (DWORD)(limit >> 32), (DWORD)limit, NULL);
1133         if( !pool->hmap )
1134                 return bt->err = BTERR_map;
1135
1136         flag = ( bt->mgr->mode == BT_ro ? FILE_MAP_READ : FILE_MAP_WRITE );
1137         pool->map = MapViewOfFile(pool->hmap, flag, (DWORD)(off >> 32), (DWORD)off, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1138         if( !pool->map )
1139                 return bt->err = BTERR_map;
1140 #endif
1141         return bt->err = 0;
1142 }
1143
1144 //      calculate page within pool
1145
1146 BtPage bt_page (BtDb *bt, BtPool *pool, uid page_no)
1147 {
1148 uint subpage = (uint)(page_no & bt->mgr->poolmask); // page within mapping
1149 BtPage page;
1150
1151         page = (BtPage)(pool->map + (subpage << bt->mgr->page_bits));
1152 //      madvise (page, bt->mgr->page_size, MADV_WILLNEED);
1153         return page;
1154 }
1155
1156 //  release pool pin
1157
1158 void bt_unpinpool (BtPool *pool)
1159 {
1160 #ifdef unix
1161         __sync_fetch_and_add(&pool->pin, -1);
1162 #else
1163         _InterlockedDecrement16 (&pool->pin);
1164 #endif
1165 }
1166
1167 //      find or place requested page in segment-pool
1168 //      return pool table entry, incrementing pin
1169
1170 BtPool *bt_pinpool(BtDb *bt, uid page_no)
1171 {
1172 uint slot, hashidx, idx, victim;
1173 BtPool *pool, *node, *next;
1174
1175         //      lock hash table chain
1176
1177         hashidx = (uint)(page_no >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1178         bt_spinwritelock (&bt->mgr->latch[hashidx]);
1179
1180         //      look up in hash table
1181
1182         if( pool = bt_findpool(bt, page_no, hashidx) ) {
1183 #ifdef unix
1184                 __sync_fetch_and_or(&pool->pin, CLOCK_bit);
1185                 __sync_fetch_and_add(&pool->pin, 1);
1186 #else
1187                 _InterlockedOr16 (&pool->pin, CLOCK_bit);
1188                 _InterlockedIncrement16 (&pool->pin);
1189 #endif
1190                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1191                 return pool;
1192         }
1193
1194         // allocate a new pool node
1195         // and add to hash table
1196
1197 #ifdef unix
1198         slot = __sync_fetch_and_add(&bt->mgr->poolcnt, 1);
1199 #else
1200         slot = _InterlockedIncrement16 (&bt->mgr->poolcnt) - 1;
1201 #endif
1202
1203         if( ++slot < bt->mgr->poolmax ) {
1204                 pool = bt->mgr->pool + slot;
1205                 pool->slot = slot;
1206
1207                 if( bt_mapsegment(bt, pool, page_no) )
1208                         return NULL;
1209
1210                 bt_linkhash(bt, pool, page_no, hashidx);
1211                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1212                 return pool;
1213         }
1214
1215         // pool table is full
1216         //      find best pool entry to evict
1217
1218 #ifdef unix
1219         __sync_fetch_and_add(&bt->mgr->poolcnt, -1);
1220 #else
1221         _InterlockedDecrement16 (&bt->mgr->poolcnt);
1222 #endif
1223
1224         while( 1 ) {
1225 #ifdef unix
1226                 victim = __sync_fetch_and_add(&bt->mgr->evicted, 1);
1227 #else
1228                 victim = _InterlockedIncrement (&bt->mgr->evicted) - 1;
1229 #endif
1230                 victim %= bt->mgr->poolmax;
1231                 pool = bt->mgr->pool + victim;
1232                 idx = (uint)(pool->basepage >> bt->mgr->seg_bits) % bt->mgr->hashsize;
1233
1234                 if( !victim )
1235                         continue;
1236
1237                 // try to get write lock
1238                 //      skip entry if not obtained
1239
1240                 if( !bt_spinwritetry (&bt->mgr->latch[idx]) )
1241                         continue;
1242
1243                 //      skip this entry if
1244                 //      page is pinned
1245                 //  or clock bit is set
1246
1247                 if( pool->pin ) {
1248 #ifdef unix
1249                         __sync_fetch_and_and(&pool->pin, ~CLOCK_bit);
1250 #else
1251                         _InterlockedAnd16 (&pool->pin, ~CLOCK_bit);
1252 #endif
1253                         bt_spinreleasewrite (&bt->mgr->latch[idx]);
1254                         continue;
1255                 }
1256
1257                 // unlink victim pool node from hash table
1258
1259                 if( node = pool->hashprev )
1260                         node->hashnext = pool->hashnext;
1261                 else if( node = pool->hashnext )
1262                         bt->mgr->hash[idx] = node->slot;
1263                 else
1264                         bt->mgr->hash[idx] = 0;
1265
1266                 if( node = pool->hashnext )
1267                         node->hashprev = pool->hashprev;
1268
1269                 bt_spinreleasewrite (&bt->mgr->latch[idx]);
1270
1271                 //      remove old file mapping
1272 #ifdef unix
1273                 munmap (pool->map, (uid)(bt->mgr->poolmask+1) << bt->mgr->page_bits);
1274 #else
1275 //              FlushViewOfFile(pool->map, 0);
1276                 UnmapViewOfFile(pool->map);
1277                 CloseHandle(pool->hmap);
1278 #endif
1279                 pool->map = NULL;
1280
1281                 //  create new pool mapping
1282                 //  and link into hash table
1283
1284                 if( bt_mapsegment(bt, pool, page_no) )
1285                         return NULL;
1286
1287                 bt_linkhash(bt, pool, page_no, hashidx);
1288                 bt_spinreleasewrite (&bt->mgr->latch[hashidx]);
1289                 return pool;
1290         }
1291 }
1292
1293 // place write, read, or parent lock on requested page_no.
1294
1295 void bt_lockpage(BtLock mode, BtLatchSet *set)
1296 {
1297         switch( mode ) {
1298         case BtLockRead:
1299                 ReadLock (set->readwr);
1300                 break;
1301         case BtLockWrite:
1302                 WriteLock (set->readwr);
1303                 break;
1304         case BtLockAccess:
1305                 ReadLock (set->access);
1306                 break;
1307         case BtLockDelete:
1308                 WriteLock (set->access);
1309                 break;
1310         case BtLockParent:
1311                 WriteLock (set->parent);
1312                 break;
1313         }
1314 }
1315
1316 // remove write, read, or parent lock on requested page
1317
1318 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1319 {
1320         switch( mode ) {
1321         case BtLockRead:
1322                 ReadRelease (set->readwr);
1323                 break;
1324         case BtLockWrite:
1325                 WriteRelease (set->readwr);
1326                 break;
1327         case BtLockAccess:
1328                 ReadRelease (set->access);
1329                 break;
1330         case BtLockDelete:
1331                 WriteRelease (set->access);
1332                 break;
1333         case BtLockParent:
1334                 WriteRelease (set->parent);
1335                 break;
1336         }
1337 }
1338
1339 //      allocate a new page and write page into it
1340
1341 uid bt_newpage(BtDb *bt, BtPage page)
1342 {
1343 BtPageSet set[1];
1344 uid new_page;
1345 int blk;
1346
1347         //      lock allocation page
1348
1349         bt_spinwritelock(bt->mgr->latchmgr->lock);
1350
1351         // use empty chain first
1352         // else allocate empty page
1353
1354         if( new_page = bt_getid(bt->mgr->latchmgr->chain) ) {
1355                 if( set->pool = bt_pinpool (bt, new_page) )
1356                         set->page = bt_page (bt, set->pool, new_page);
1357                 else
1358                         return 0;
1359
1360                 bt_putid(bt->mgr->latchmgr->chain, bt_getid(set->page->right));
1361                 bt_unpinpool (set->pool);
1362         } else {
1363                 new_page = bt_getid(bt->mgr->latchmgr->alloc->right);
1364                 bt_putid(bt->mgr->latchmgr->alloc->right, new_page+1);
1365 #ifdef unix
1366                 // if writing first page of pool block, set file length thru last page
1367
1368                 if( (new_page & bt->mgr->poolmask) == 0 )
1369                         ftruncate (bt->mgr->idx, (new_page + bt->mgr->poolmask + 1) << bt->mgr->page_bits);
1370 #endif
1371         }
1372 #ifdef unix
1373         // unlock allocation latch
1374
1375         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1376 #endif
1377
1378         //      bring new page into pool and copy page.
1379         //      this will extend the file into the new pages on WIN32.
1380
1381         if( set->pool = bt_pinpool (bt, new_page) )
1382                 set->page = bt_page (bt, set->pool, new_page);
1383         else
1384                 return 0;
1385
1386         memcpy(set->page, page, bt->mgr->page_size);
1387         bt_unpinpool (set->pool);
1388
1389 #ifndef unix
1390         // unlock allocation latch
1391
1392         bt_spinreleasewrite(bt->mgr->latchmgr->lock);
1393 #endif
1394         return new_page;
1395 }
1396
1397 //  find slot in page for given key at a given level
1398
1399 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1400 {
1401 uint diff, higher = set->page->cnt, low = 1, slot;
1402 uint good = 0;
1403
1404         //        make stopper key an infinite fence value
1405
1406         if( bt_getid (set->page->right) )
1407                 higher++;
1408         else
1409                 good++;
1410
1411         //      low is the lowest candidate.
1412         //  loop ends when they meet
1413
1414         //  higher is already
1415         //      tested as .ge. the passed key.
1416
1417         while( diff = higher - low ) {
1418                 slot = low + ( diff >> 1 );
1419                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1420                         low = slot + 1;
1421                 else
1422                         higher = slot, good++;
1423         }
1424
1425         //      return zero if key is on right link page
1426
1427         return good ? higher : 0;
1428 }
1429
1430 //  find and load page at given level for given key
1431 //      leave page rd or wr locked as requested
1432
1433 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1434 {
1435 uid page_no = ROOT_page, prevpage = 0;
1436 uint drill = 0xff, slot;
1437 BtLatchSet *prevlatch;
1438 uint mode, prevmode;
1439 BtPool *prevpool;
1440
1441   //  start at root of btree and drill down
1442
1443   do {
1444         // determine lock mode of drill level
1445         mode = (drill == lvl) ? lock : BtLockRead; 
1446
1447         set->latch = bt_pinlatch (bt, page_no);
1448         set->page_no = page_no;
1449
1450         // pin page contents
1451
1452         if( set->pool = bt_pinpool (bt, page_no) )
1453                 set->page = bt_page (bt, set->pool, page_no);
1454         else
1455                 return 0;
1456
1457         // obtain access lock using lock chaining with Access mode
1458
1459         if( page_no > ROOT_page )
1460           bt_lockpage(BtLockAccess, set->latch);
1461
1462         //      release & unpin parent page
1463
1464         if( prevpage ) {
1465           bt_unlockpage(prevmode, prevlatch);
1466           bt_unpinlatch (prevlatch);
1467           bt_unpinpool (prevpool);
1468           prevpage = 0;
1469         }
1470
1471         // obtain read lock using lock chaining
1472
1473         bt_lockpage(mode, set->latch);
1474
1475         if( set->page->free )
1476                 return bt->err = BTERR_struct, 0;
1477
1478         if( page_no > ROOT_page )
1479           bt_unlockpage(BtLockAccess, set->latch);
1480
1481         // re-read and re-lock root after determining actual level of root
1482
1483         if( set->page->lvl != drill) {
1484                 if( set->page_no != ROOT_page )
1485                         return bt->err = BTERR_struct, 0;
1486                         
1487                 drill = set->page->lvl;
1488
1489                 if( lock != BtLockRead && drill == lvl ) {
1490                   bt_unlockpage(mode, set->latch);
1491                   bt_unpinlatch (set->latch);
1492                   bt_unpinpool (set->pool);
1493                   continue;
1494                 }
1495         }
1496
1497         prevpage = set->page_no;
1498         prevlatch = set->latch;
1499         prevpool = set->pool;
1500         prevmode = mode;
1501
1502         //  find key on page at this level
1503         //  and descend to requested level
1504
1505         if( set->page->kill )
1506           goto slideright;
1507
1508         if( slot = bt_findslot (set, key, len) ) {
1509           if( drill == lvl )
1510                 return slot;
1511
1512           while( slotptr(set->page, slot)->dead )
1513                 if( slot++ < set->page->cnt )
1514                         continue;
1515                 else
1516                         goto slideright;
1517
1518           page_no = bt_getid(valptr(set->page, slot)->value);
1519           drill--;
1520           continue;
1521         }
1522
1523         //  or slide right into next page
1524
1525 slideright:
1526         page_no = bt_getid(set->page->right);
1527
1528   } while( page_no );
1529
1530   // return error on end of right chain
1531
1532   bt->err = BTERR_struct;
1533   return 0;     // return error
1534 }
1535
1536 //      return page to free list
1537 //      page must be delete & write locked
1538
1539 void bt_freepage (BtDb *bt, BtPageSet *set)
1540 {
1541         //      lock allocation page
1542
1543         bt_spinwritelock (bt->mgr->latchmgr->lock);
1544
1545         //      store chain
1546         memcpy(set->page->right, bt->mgr->latchmgr->chain, BtId);
1547         bt_putid(bt->mgr->latchmgr->chain, set->page_no);
1548         set->page->free = 1;
1549
1550         // unlock released page
1551
1552         bt_unlockpage (BtLockDelete, set->latch);
1553         bt_unlockpage (BtLockWrite, set->latch);
1554         bt_unpinlatch (set->latch);
1555         bt_unpinpool (set->pool);
1556
1557         // unlock allocation page
1558
1559         bt_spinreleasewrite (bt->mgr->latchmgr->lock);
1560 }
1561
1562 //      a fence key was deleted from a page
1563 //      push new fence value upwards
1564
1565 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1566 {
1567 unsigned char leftkey[256], rightkey[256];
1568 unsigned char value[BtId];
1569 BtKey ptr;
1570
1571         //      remove the old fence value
1572
1573         ptr = keyptr(set->page, set->page->cnt);
1574         memcpy (rightkey, ptr, ptr->len + 1);
1575         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1576
1577         ptr = keyptr(set->page, set->page->cnt);
1578         memcpy (leftkey, ptr, ptr->len + 1);
1579
1580         bt_lockpage (BtLockParent, set->latch);
1581         bt_unlockpage (BtLockWrite, set->latch);
1582
1583         //      insert new (now smaller) fence key
1584
1585         bt_putid (value, set->page_no);
1586
1587         if( bt_insertkey (bt, leftkey+1, *leftkey, lvl+1, value, BtId) )
1588           return bt->err;
1589
1590         //      now delete old fence key
1591
1592         if( bt_deletekey (bt, rightkey+1, *rightkey, lvl+1) )
1593                 return bt->err;
1594
1595         bt_unlockpage (BtLockParent, set->latch);
1596         bt_unpinlatch(set->latch);
1597         bt_unpinpool (set->pool);
1598         return 0;
1599 }
1600
1601 //      root has a single child
1602 //      collapse a level from the tree
1603
1604 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1605 {
1606 BtPageSet child[1];
1607 uint idx;
1608
1609   // find the child entry and promote as new root contents
1610
1611   do {
1612         for( idx = 0; idx++ < root->page->cnt; )
1613           if( !slotptr(root->page, idx)->dead )
1614                 break;
1615
1616         child->page_no = bt_getid (valptr(root->page, idx)->value);
1617
1618         child->latch = bt_pinlatch (bt, child->page_no);
1619         bt_lockpage (BtLockDelete, child->latch);
1620         bt_lockpage (BtLockWrite, child->latch);
1621
1622         if( child->pool = bt_pinpool (bt, child->page_no) )
1623                 child->page = bt_page (bt, child->pool, child->page_no);
1624         else
1625                 return bt->err;
1626
1627         memcpy (root->page, child->page, bt->mgr->page_size);
1628         bt_freepage (bt, child);
1629
1630   } while( root->page->lvl > 1 && root->page->act == 1 );
1631
1632   bt_unlockpage (BtLockWrite, root->latch);
1633   bt_unpinlatch (root->latch);
1634   bt_unpinpool (root->pool);
1635   return 0;
1636 }
1637
1638 //  find and delete key on page by marking delete flag bit
1639 //  if page becomes empty, delete it from the btree
1640
1641 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1642 {
1643 unsigned char lowerfence[256], higherfence[256];
1644 uint slot, idx, found, fence;
1645 BtPageSet set[1], right[1];
1646 unsigned char value[BtId];
1647 BtKey ptr, tst;
1648 BtVal val;
1649
1650         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1651                 ptr = keyptr(set->page, slot);
1652         else
1653                 return bt->err;
1654
1655         // if librarian slot, advance to real slot
1656
1657         if( slotptr(set->page, slot)->librarian )
1658                 ptr = keyptr(set->page, ++slot);
1659
1660         fence = slot == set->page->cnt;
1661
1662         // if key is found delete it, otherwise ignore request
1663
1664         if( found = !keycmp (ptr, key, len) )
1665           if( found = slotptr(set->page, slot)->dead == 0 ) {
1666                 val = valptr(set->page,slot);
1667                 slotptr(set->page, slot)->dead = 1;
1668                 set->page->garbage += ptr->len + val->len + 2;
1669                 set->page->act--;
1670
1671                 // collapse empty slots beneath the fence
1672
1673                 while( idx = set->page->cnt - 1 )
1674                   if( slotptr(set->page, idx)->dead ) {
1675                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1676                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1677                   } else
1678                         break;
1679           }
1680
1681         //      did we delete a fence key in an upper level?
1682
1683         if( found && fence && lvl && set->page->act )
1684           if( bt_fixfence (bt, set, lvl) )
1685                 return bt->err;
1686           else
1687                 return bt->found = found, 0;
1688
1689         //      is this a collapsed root?
1690
1691         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1692           if( bt_collapseroot (bt, set) )
1693                 return bt->err;
1694           else
1695                 return bt->found = found, 0;
1696
1697         //      return if page is not empty
1698
1699         if( set->page->act ) {
1700                 bt_unlockpage(BtLockWrite, set->latch);
1701                 bt_unpinlatch (set->latch);
1702                 bt_unpinpool (set->pool);
1703                 return bt->found = found, 0;
1704         }
1705
1706         //      cache copy of fence key
1707         //      to post in parent
1708
1709         ptr = keyptr(set->page, set->page->cnt);
1710         memcpy (lowerfence, ptr, ptr->len + 1);
1711
1712         //      obtain lock on right page
1713
1714         right->page_no = bt_getid(set->page->right);
1715         right->latch = bt_pinlatch (bt, right->page_no);
1716         bt_lockpage (BtLockWrite, right->latch);
1717
1718         // pin page contents
1719
1720         if( right->pool = bt_pinpool (bt, right->page_no) )
1721                 right->page = bt_page (bt, right->pool, right->page_no);
1722         else
1723                 return 0;
1724
1725         if( right->page->kill )
1726                 return bt->err = BTERR_struct;
1727
1728         // pull contents of right peer into our empty page
1729
1730         memcpy (set->page, right->page, bt->mgr->page_size);
1731
1732         // cache copy of key to update
1733
1734         ptr = keyptr(right->page, right->page->cnt);
1735         memcpy (higherfence, ptr, ptr->len + 1);
1736
1737         // mark right page deleted and point it to left page
1738         //      until we can post parent updates
1739
1740         bt_putid (right->page->right, set->page_no);
1741         right->page->kill = 1;
1742
1743         bt_lockpage (BtLockParent, right->latch);
1744         bt_unlockpage (BtLockWrite, right->latch);
1745
1746         bt_lockpage (BtLockParent, set->latch);
1747         bt_unlockpage (BtLockWrite, set->latch);
1748
1749         // redirect higher key directly to our new node contents
1750
1751         bt_putid (value, set->page_no);
1752
1753         if( bt_insertkey (bt, higherfence+1, *higherfence, lvl+1, value, BtId) )
1754           return bt->err;
1755
1756         //      delete old lower key to our node
1757
1758         if( bt_deletekey (bt, lowerfence+1, *lowerfence, lvl+1) )
1759           return bt->err;
1760
1761         //      obtain delete and write locks to right node
1762
1763         bt_unlockpage (BtLockParent, right->latch);
1764         bt_lockpage (BtLockDelete, right->latch);
1765         bt_lockpage (BtLockWrite, right->latch);
1766         bt_freepage (bt, right);
1767
1768         bt_unlockpage (BtLockParent, set->latch);
1769         bt_unpinlatch (set->latch);
1770         bt_unpinpool (set->pool);
1771         bt->found = found;
1772         return 0;
1773 }
1774
1775 //      find key in leaf level and return number of value bytes
1776 //      or (-1) if not found
1777
1778 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1779 {
1780 BtPageSet set[1];
1781 uint  slot;
1782 BtKey ptr;
1783 BtVal val;
1784 int ret;
1785
1786         if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1787                 ptr = keyptr(set->page, slot);
1788         else
1789                 return 0;
1790
1791         //      skip librarian slot place holder
1792
1793         if( slotptr(set->page, slot)->librarian )
1794                 ptr = keyptr(set->page, ++slot);
1795
1796         // if key exists, return >= 0 value bytes copied
1797         //      otherwise return (-1)
1798
1799         if( !slotptr(set->page, slot)->dead && !keycmp (ptr, key, keylen) ) {
1800                 val = valptr (set->page,slot);
1801                 if( valmax > val->len )
1802                         valmax = val->len;
1803                 memcpy (value, val->value, valmax);
1804                 ret = valmax;
1805         } else
1806                 ret = -1;
1807
1808         bt_unlockpage (BtLockRead, set->latch);
1809         bt_unpinlatch (set->latch);
1810         bt_unpinpool (set->pool);
1811         return ret;
1812 }
1813
1814 //      check page for space available,
1815 //      clean if necessary and return
1816 //      0 - page needs splitting
1817 //      >0  new slot value
1818
1819 uint bt_cleanpage(BtDb *bt, BtPage page, uint keylen, uint slot, uint vallen)
1820 {
1821 uint nxt = bt->mgr->page_size;
1822 uint cnt = 0, idx = 0;
1823 uint max = page->cnt;
1824 uint newslot = max;
1825 BtKey key;
1826 BtVal val;
1827
1828         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1)
1829                 return slot;
1830
1831         //      skip cleanup and proceed to split
1832         //      if there's not enough garbage
1833
1834         if( page->garbage < nxt / 5 )
1835                 return 0;
1836
1837         memcpy (bt->frame, page, bt->mgr->page_size);
1838
1839         // skip page info and set rest of page to zero
1840
1841         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1842         page->garbage = 0;
1843         page->act = 0;
1844
1845         // clean up page first by
1846         // removing deleted keys
1847
1848         while( cnt++ < max ) {
1849                 if( cnt == slot )
1850                         newslot = idx + 2;
1851                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1852                         continue;
1853
1854                 // copy the value across
1855
1856                 val = valptr(bt->frame, cnt);
1857                 nxt -= val->len + 1;
1858                 ((unsigned char *)page)[nxt] = val->len;
1859                 memcpy ((unsigned char *)page + nxt + 1, val->value, val->len);
1860
1861                 // copy the key across
1862
1863                 key = keyptr(bt->frame, cnt);
1864                 nxt -= key->len + 1;
1865                 memcpy ((unsigned char *)page + nxt, key, key->len + 1);
1866
1867                 // make a librarian slot
1868
1869                 if( idx ) {
1870                         slotptr(page, ++idx)->off = nxt;
1871                         slotptr(page, idx)->librarian = 1;
1872                         slotptr(page, idx)->dead = 1;
1873                 }
1874
1875                 // set up the slot
1876
1877                 slotptr(page, ++idx)->off = nxt;
1878
1879                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1880                         page->act++;
1881         }
1882
1883         page->min = nxt;
1884         page->cnt = idx;
1885
1886         //      see if page has enough space now, or does it need splitting?
1887
1888         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + 1 + vallen + 1 )
1889                 return newslot;
1890
1891         return 0;
1892 }
1893
1894 // split the root and raise the height of the btree
1895
1896 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, unsigned char *leftkey, uid page_no2)
1897 {
1898 uint nxt = bt->mgr->page_size;
1899 unsigned char value[BtId];
1900 uid left;
1901
1902         //  Obtain an empty page to use, and copy the current
1903         //  root contents into it, e.g. lower keys
1904
1905         if( !(left = bt_newpage(bt, root->page)) )
1906                 return bt->err;
1907
1908         // preserve the page info at the bottom
1909         // of higher keys and set rest to zero
1910
1911         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1912
1913         // insert lower keys page fence key on newroot page as first key
1914
1915         nxt -= BtId + 1;
1916         bt_putid (value, left);
1917         ((unsigned char *)root->page)[nxt] = BtId;
1918         memcpy ((unsigned char *)root->page + nxt + 1, value, BtId);
1919
1920         nxt -= *leftkey + 1;
1921         memcpy ((unsigned char *)root->page + nxt, leftkey, *leftkey + 1);
1922         slotptr(root->page, 1)->off = nxt;
1923         
1924         // insert stopper key on newroot page
1925         // and increase the root height
1926
1927         nxt -= 3 + BtId + 1;
1928         ((unsigned char *)root->page)[nxt] = 2;
1929         ((unsigned char *)root->page)[nxt+1] = 0xff;
1930         ((unsigned char *)root->page)[nxt+2] = 0xff;
1931
1932         bt_putid (value, page_no2);
1933         ((unsigned char *)root->page)[nxt+3] = BtId;
1934         memcpy ((unsigned char *)root->page + nxt + 4, value, BtId);
1935         slotptr(root->page, 2)->off = nxt;
1936
1937         bt_putid(root->page->right, 0);
1938         root->page->min = nxt;          // reset lowest used offset and key count
1939         root->page->cnt = 2;
1940         root->page->act = 2;
1941         root->page->lvl++;
1942
1943         // release and unpin root
1944
1945         bt_unlockpage(BtLockWrite, root->latch);
1946         bt_unpinlatch (root->latch);
1947         bt_unpinpool (root->pool);
1948         return 0;
1949 }
1950
1951 //  split already locked full node
1952 //      return unlocked.
1953
1954 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1955 {
1956 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1957 unsigned char fencekey[256], rightkey[256];
1958 unsigned char value[BtId];
1959 uint lvl = set->page->lvl;
1960 BtPageSet right[1];
1961 uint prev;
1962 BtKey key;
1963 BtVal val;
1964
1965         //  split higher half of keys to bt->frame
1966
1967         memset (bt->frame, 0, bt->mgr->page_size);
1968         max = set->page->cnt;
1969         cnt = max / 2;
1970         idx = 0;
1971
1972         while( cnt++ < max ) {
1973                 if( slotptr(set->page, cnt)->dead && cnt < max )
1974                         continue;
1975                 val = valptr(set->page, cnt);
1976                 nxt -= val->len + 1;
1977                 ((unsigned char *)bt->frame)[nxt] = val->len;
1978                 memcpy ((unsigned char *)bt->frame + nxt + 1, val->value, val->len);
1979
1980                 key = keyptr(set->page, cnt);
1981                 nxt -= key->len + 1;
1982                 memcpy ((unsigned char *)bt->frame + nxt, key, key->len + 1);
1983
1984                 //      add librarian slot
1985
1986                 if( idx ) {
1987                         slotptr(bt->frame, ++idx)->off = nxt;
1988                         slotptr(bt->frame, idx)->librarian = 1;
1989                         slotptr(bt->frame, idx)->dead = 1;
1990                 }
1991
1992                 //  add actual slot
1993
1994                 slotptr(bt->frame, ++idx)->off = nxt;
1995
1996                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1997                         bt->frame->act++;
1998         }
1999
2000         // remember existing fence key for new page to the right
2001
2002         memcpy (rightkey, key, key->len + 1);
2003
2004         bt->frame->bits = bt->mgr->page_bits;
2005         bt->frame->min = nxt;
2006         bt->frame->cnt = idx;
2007         bt->frame->lvl = lvl;
2008
2009         // link right node
2010
2011         if( set->page_no > ROOT_page )
2012                 memcpy (bt->frame->right, set->page->right, BtId);
2013
2014         //      get new free page and write higher keys to it.
2015
2016         if( !(right->page_no = bt_newpage(bt, bt->frame)) )
2017                 return bt->err;
2018
2019         //      update lower keys to continue in old page
2020
2021         memcpy (bt->frame, set->page, bt->mgr->page_size);
2022         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
2023         nxt = bt->mgr->page_size;
2024         set->page->garbage = 0;
2025         set->page->act = 0;
2026         max /= 2;
2027         cnt = 0;
2028         idx = 0;
2029
2030         if( slotptr(bt->frame, max)->librarian )
2031                 max--;
2032
2033         //  assemble page of smaller keys
2034
2035         while( cnt++ < max ) {
2036                 if( slotptr(bt->frame, cnt)->dead )
2037                         continue;
2038                 val = valptr(bt->frame, cnt);
2039                 nxt -= val->len + 1;
2040                 ((unsigned char *)set->page)[nxt] = val->len;
2041                 memcpy ((unsigned char *)set->page + nxt + 1, val->value, val->len);
2042
2043                 key = keyptr(bt->frame, cnt);
2044                 nxt -= key->len + 1;
2045                 memcpy ((unsigned char *)set->page + nxt, key, key->len + 1);
2046
2047                 //      add librarian slot
2048
2049                 if( idx ) {
2050                         slotptr(set->page, ++idx)->off = nxt;
2051                         slotptr(set->page, idx)->librarian = 1;
2052                         slotptr(set->page, idx)->dead = 1;
2053                 }
2054
2055                 //      add actual slot
2056
2057                 slotptr(set->page, ++idx)->off = nxt;
2058                 set->page->act++;
2059         }
2060
2061         // remember fence key for smaller page
2062
2063         memcpy(fencekey, key, key->len + 1);
2064
2065         bt_putid(set->page->right, right->page_no);
2066         set->page->min = nxt;
2067         set->page->cnt = idx;
2068
2069         // if current page is the root page, split it
2070
2071         if( set->page_no == ROOT_page )
2072                 return bt_splitroot (bt, set, fencekey, right->page_no);
2073
2074         // insert new fences in their parent pages
2075
2076         right->latch = bt_pinlatch (bt, right->page_no);
2077         bt_lockpage (BtLockParent, right->latch);
2078
2079         bt_lockpage (BtLockParent, set->latch);
2080         bt_unlockpage (BtLockWrite, set->latch);
2081
2082         // insert new fence for reformulated left block of smaller keys
2083
2084         bt_putid (value, set->page_no);
2085
2086         if( bt_insertkey (bt, fencekey+1, *fencekey, lvl+1, value, BtId) )
2087                 return bt->err;
2088
2089         // switch fence for right block of larger keys to new right page
2090
2091         bt_putid (value, right->page_no);
2092
2093         if( bt_insertkey (bt, rightkey+1, *rightkey, lvl+1, value, BtId) )
2094                 return bt->err;
2095
2096         bt_unlockpage (BtLockParent, set->latch);
2097         bt_unpinlatch (set->latch);
2098         bt_unpinpool (set->pool);
2099
2100         bt_unlockpage (BtLockParent, right->latch);
2101         bt_unpinlatch (right->latch);
2102         return 0;
2103 }
2104
2105 //      install new key and value onto page
2106 //      page must already be checked for
2107 //      adequate space
2108
2109 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen)
2110 {
2111 uint idx, librarian;
2112 BtSlot *node;
2113
2114         //      if found slot > desired slot and previous slot
2115         //      is a librarian slot, use it
2116
2117         if( slot > 1 )
2118           if( slotptr(set->page, slot-1)->librarian )
2119                 slot--;
2120
2121         // copy value onto page
2122
2123         set->page->min -= vallen + 1; // reset lowest used offset
2124         ((unsigned char *)set->page)[set->page->min] = vallen;
2125         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen );
2126
2127         // copy key onto page
2128
2129         set->page->min -= keylen + 1; // reset lowest used offset
2130         ((unsigned char *)set->page)[set->page->min] = keylen;
2131         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen );
2132
2133         //      find first empty slot
2134
2135         for( idx = slot; idx < set->page->cnt; idx++ )
2136           if( slotptr(set->page, idx)->dead )
2137                 break;
2138
2139         // now insert key into array before slot
2140
2141         if( idx == set->page->cnt )
2142                 idx += 2, set->page->cnt += 2, librarian = 2;
2143         else
2144                 librarian = 1;
2145
2146         set->page->act++;
2147
2148         while( idx > slot + librarian - 1 )
2149                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2150
2151         //      add librarian slot
2152
2153         if( librarian > 1 ) {
2154                 node = slotptr(set->page, slot++);
2155                 node->off = set->page->min;
2156                 node->librarian = 1;
2157                 node->dead = 1;
2158         }
2159
2160         //      fill in new slot
2161
2162         node = slotptr(set->page, slot);
2163         node->off = set->page->min;
2164         node->librarian = 0;
2165         node->dead = 0;
2166
2167         bt_unlockpage (BtLockWrite, set->latch);
2168         bt_unpinlatch (set->latch);
2169         bt_unpinpool (set->pool);
2170         return 0;
2171 }
2172
2173 //  Insert new key into the btree at given level.
2174 //      either add a new key or update/add an existing one
2175
2176 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen)
2177 {
2178 BtPageSet set[1];
2179 uint slot, idx;
2180 uid sequence;
2181 BtKey ptr;
2182 BtVal val;
2183 uint type;
2184
2185   while( 1 ) { // find the page and slot for the current key
2186         if( slot = bt_loadpage (bt, set, key, keylen, lvl, BtLockWrite) )
2187                 ptr = keyptr(set->page, slot);
2188         else {
2189                 if( !bt->err )
2190                         bt->err = BTERR_ovflw;
2191                 return bt->err;
2192         }
2193
2194         // if librarian slot == found slot, advance to real slot
2195
2196         if( slotptr(set->page, slot)->librarian )
2197           if( !keycmp (ptr, key, keylen) )
2198                 ptr = keyptr(set->page, ++slot);
2199
2200         //      check for adequate space on the page
2201         //      and insert the new key before slot.
2202
2203         if( keycmp (ptr, key, keylen) ) {
2204           if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2205                 if( bt_splitpage (bt, set) )
2206                   return bt->err;
2207                 else
2208                   continue;
2209
2210           return bt_insertslot (bt, set, slot, key, keylen, value, vallen);
2211         }
2212
2213         // if key already exists, update value and return
2214
2215         if( val = valptr(set->page, slot), val->len >= vallen ) {
2216                 if( slotptr(set->page, slot)->dead )
2217                         set->page->act++;
2218                 set->page->garbage += val->len - vallen;
2219                 slotptr(set->page, slot)->dead = 0;
2220                 val->len = vallen;
2221                 memcpy (val->value, value, vallen);
2222                 bt_unlockpage(BtLockWrite, set->latch);
2223                 bt_unpinlatch (set->latch);
2224                 bt_unpinpool (set->pool);
2225                 return 0;
2226         }
2227
2228         //      new update value doesn't fit in existing value area
2229
2230         if( !slotptr(set->page, slot)->dead )
2231                 set->page->garbage += val->len + ptr->len + 2;
2232         else {
2233                 slotptr(set->page, slot)->dead = 0;
2234                 set->page->act++;
2235         }
2236
2237         if( !(slot = bt_cleanpage (bt, set->page, keylen, slot, vallen)) )
2238           if( bt_splitpage (bt, set) )
2239                 return bt->err;
2240           else
2241                 continue;
2242
2243         set->page->min -= vallen + 1;
2244         ((unsigned char *)set->page)[set->page->min] = vallen;
2245         memcpy ((unsigned char *)set->page + set->page->min +1, value, vallen);
2246
2247         set->page->min -= keylen + 1;
2248         ((unsigned char *)set->page)[set->page->min] = keylen;
2249         memcpy ((unsigned char *)set->page + set->page->min +1, key, keylen);
2250         
2251         slotptr(set->page, slot)->off = set->page->min;
2252         bt_unlockpage(BtLockWrite, set->latch);
2253         bt_unpinlatch (set->latch);
2254         bt_unpinpool (set->pool);
2255         return 0;
2256   }
2257   return 0;
2258 }
2259
2260 //  cache page of keys into cursor and return starting slot for given key
2261
2262 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2263 {
2264 BtPageSet set[1];
2265 uint slot;
2266
2267         // cache page for retrieval
2268
2269         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2270           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2271         else
2272           return 0;
2273
2274         bt->cursor_page = set->page_no;
2275
2276         bt_unlockpage(BtLockRead, set->latch);
2277         bt_unpinlatch (set->latch);
2278         bt_unpinpool (set->pool);
2279         return slot;
2280 }
2281
2282 //  return next slot for cursor page
2283 //  or slide cursor right into next page
2284
2285 uint bt_nextkey (BtDb *bt, uint slot)
2286 {
2287 BtPageSet set[1];
2288 uid right;
2289
2290   do {
2291         right = bt_getid(bt->cursor->right);
2292
2293         while( slot++ < bt->cursor->cnt )
2294           if( slotptr(bt->cursor,slot)->dead )
2295                 continue;
2296           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2297                 return slot;
2298           else
2299                 break;
2300
2301         if( !right )
2302                 break;
2303
2304         bt->cursor_page = right;
2305
2306         if( set->pool = bt_pinpool (bt, right) )
2307                 set->page = bt_page (bt, set->pool, right);
2308         else
2309                 return 0;
2310
2311         set->latch = bt_pinlatch (bt, right);
2312     bt_lockpage(BtLockRead, set->latch);
2313
2314         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2315
2316         bt_unlockpage(BtLockRead, set->latch);
2317         bt_unpinlatch (set->latch);
2318         bt_unpinpool (set->pool);
2319         slot = 0;
2320
2321   } while( 1 );
2322
2323   return bt->err = 0;
2324 }
2325
2326 BtKey bt_key(BtDb *bt, uint slot)
2327 {
2328         return keyptr(bt->cursor, slot);
2329 }
2330
2331 BtVal bt_val(BtDb *bt, uint slot)
2332 {
2333         return valptr(bt->cursor,slot);
2334 }
2335
2336 #ifdef STANDALONE
2337
2338 #ifndef unix
2339 double getCpuTime(int type)
2340 {
2341 FILETIME crtime[1];
2342 FILETIME xittime[1];
2343 FILETIME systime[1];
2344 FILETIME usrtime[1];
2345 SYSTEMTIME timeconv[1];
2346 double ans = 0;
2347
2348         memset (timeconv, 0, sizeof(SYSTEMTIME));
2349
2350         switch( type ) {
2351         case 0:
2352                 GetSystemTimeAsFileTime (xittime);
2353                 FileTimeToSystemTime (xittime, timeconv);
2354                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2355                 break;
2356         case 1:
2357                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2358                 FileTimeToSystemTime (usrtime, timeconv);
2359                 break;
2360         case 2:
2361                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2362                 FileTimeToSystemTime (systime, timeconv);
2363                 break;
2364         }
2365
2366         ans += (double)timeconv->wHour * 3600;
2367         ans += (double)timeconv->wMinute * 60;
2368         ans += (double)timeconv->wSecond;
2369         ans += (double)timeconv->wMilliseconds / 1000;
2370         return ans;
2371 }
2372 #else
2373 #include <time.h>
2374 #include <sys/resource.h>
2375
2376 double getCpuTime(int type)
2377 {
2378 struct rusage used[1];
2379 struct timeval tv[1];
2380
2381         switch( type ) {
2382         case 0:
2383                 gettimeofday(tv, NULL);
2384                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2385
2386         case 1:
2387                 getrusage(RUSAGE_SELF, used);
2388                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2389
2390         case 2:
2391                 getrusage(RUSAGE_SELF, used);
2392                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2393         }
2394
2395         return 0;
2396 }
2397 #endif
2398
2399 uint bt_latchaudit (BtDb *bt)
2400 {
2401 ushort idx, hashidx;
2402 uid next, page_no;
2403 BtLatchSet *latch;
2404 uint cnt = 0;
2405 BtKey ptr;
2406
2407 #ifdef unix
2408         posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2409 #endif
2410         if( *(ushort *)(bt->mgr->latchmgr->lock) )
2411                 fprintf(stderr, "Alloc page locked\n");
2412         *(ushort *)(bt->mgr->latchmgr->lock) = 0;
2413
2414         for( idx = 1; idx <= bt->mgr->latchmgr->latchdeployed; idx++ ) {
2415                 latch = bt->mgr->latchsets + idx;
2416                 if( *latch->readwr->rin & MASK )
2417                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2418                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2419
2420                 if( *latch->access->rin & MASK )
2421                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2422                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2423
2424                 if( *latch->parent->rin & MASK )
2425                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2426                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2427
2428                 if( latch->pin ) {
2429                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2430                         latch->pin = 0;
2431                 }
2432         }
2433
2434         for( hashidx = 0; hashidx < bt->mgr->latchmgr->latchhash; hashidx++ ) {
2435           if( *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) )
2436                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2437
2438           *(ushort *)(bt->mgr->latchmgr->table[hashidx].latch) = 0;
2439
2440           if( idx = bt->mgr->latchmgr->table[hashidx].slot ) do {
2441                 latch = bt->mgr->latchsets + idx;
2442                 if( *(ushort *)latch->busy )
2443                         fprintf(stderr, "latchset %d busylocked for page %.8x\n", idx, latch->page_no);
2444                 *(ushort *)latch->busy = 0;
2445                 if( latch->pin )
2446                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2447           } while( idx = latch->next );
2448         }
2449
2450         next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2451         page_no = LEAF_page;
2452
2453         while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2454         off64_t off = page_no << bt->mgr->page_bits;
2455 #ifdef unix
2456                 pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2457 #else
2458                 DWORD amt[1];
2459
2460                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2461
2462                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2463                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2464
2465                   if( *amt <  bt->mgr->page_size )
2466                         fprintf(stderr, "page %.8x unable to read\n", page_no);
2467 #endif
2468                 if( !bt->frame->free ) {
2469                  for( idx = 0; idx++ < bt->frame->cnt - 1; ) {
2470                   ptr = keyptr(bt->frame, idx+1);
2471                   if( keycmp (keyptr(bt->frame, idx), ptr->key, ptr->len) >= 0 )
2472                         fprintf(stderr, "page %.8x idx %.2x out of order\n", page_no, idx);
2473                  }
2474                  if( !bt->frame->lvl )
2475                         cnt += bt->frame->act;
2476                 }
2477                 if( page_no > LEAF_page )
2478                         next = page_no + 1;
2479                 page_no = next;
2480         }
2481         return cnt - 1;
2482 }
2483
2484 typedef struct {
2485         char idx;
2486         char *type;
2487         char *infile;
2488         BtMgr *mgr;
2489         int num;
2490 } ThreadArg;
2491
2492 //  standalone program to index file of keys
2493 //  then list them onto std-out
2494
2495 #ifdef unix
2496 void *index_file (void *arg)
2497 #else
2498 uint __stdcall index_file (void *arg)
2499 #endif
2500 {
2501 int line = 0, found = 0, cnt = 0;
2502 uid next, page_no = LEAF_page;  // start on first page of leaves
2503 unsigned char key[256];
2504 ThreadArg *args = arg;
2505 int ch, len = 0, slot;
2506 BtPageSet set[1];
2507 BtKey ptr;
2508 BtDb *bt;
2509 FILE *in;
2510
2511         bt = bt_open (args->mgr);
2512
2513         switch(args->type[0] | 0x20)
2514         {
2515         case 'a':
2516                 fprintf(stderr, "started latch mgr audit\n");
2517                 cnt = bt_latchaudit (bt);
2518                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2519                 break;
2520
2521         case 'p':
2522                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2523                 if( in = fopen (args->infile, "rb") )
2524                   while( ch = getc(in), ch != EOF )
2525                         if( ch == '\n' )
2526                         {
2527                           line++;
2528
2529                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10) )
2530                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2531                           len = 0;
2532                         }
2533                         else if( len < 255 )
2534                                 key[len++] = ch;
2535                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2536                 break;
2537
2538         case 'w':
2539                 fprintf(stderr, "started indexing for %s\n", args->infile);
2540                 if( in = fopen (args->infile, "rb") )
2541                   while( ch = getc(in), ch != EOF )
2542                         if( ch == '\n' )
2543                         {
2544                           line++;
2545
2546                           if( args->num == 1 )
2547                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2548
2549                           else if( args->num )
2550                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2551
2552                           if( bt_insertkey (bt, key, len, 0, NULL, 0) )
2553                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2554                           len = 0;
2555                         }
2556                         else if( len < 255 )
2557                                 key[len++] = ch;
2558                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
2559                 break;
2560
2561         case 'd':
2562                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2563                 if( in = fopen (args->infile, "rb") )
2564                   while( ch = getc(in), ch != EOF )
2565                         if( ch == '\n' )
2566                         {
2567                           line++;
2568                           if( args->num == 1 )
2569                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2570
2571                           else if( args->num )
2572                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2573
2574                           if( bt_deletekey (bt, key, len, 0) )
2575                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2576                           found += bt->found;
2577                           len = 0;
2578                         }
2579                         else if( len < 255 )
2580                                 key[len++] = ch;
2581                 fprintf(stderr, "finished %s for %d keys, found %d \n", args->infile, line, found);
2582                 break;
2583
2584         case 'f':
2585                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2586                 if( in = fopen (args->infile, "rb") )
2587                   while( ch = getc(in), ch != EOF )
2588                         if( ch == '\n' )
2589                         {
2590                           line++;
2591                           if( args->num == 1 )
2592                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2593
2594                           else if( args->num )
2595                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2596
2597                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2598                                 found++;
2599                           else if( bt->err )
2600                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2601                           len = 0;
2602                         }
2603                         else if( len < 255 )
2604                                 key[len++] = ch;
2605                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
2606                 break;
2607
2608         case 's':
2609                 fprintf(stderr, "started scanning\n");
2610                 do {
2611                         if( set->pool = bt_pinpool (bt, page_no) )
2612                                 set->page = bt_page (bt, set->pool, page_no);
2613                         else
2614                                 break;
2615                         set->latch = bt_pinlatch (bt, page_no);
2616                         bt_lockpage (BtLockRead, set->latch);
2617                         next = bt_getid (set->page->right);
2618
2619                         for( slot = 0; slot++ < set->page->cnt; )
2620                          if( next || slot < set->page->cnt )
2621                           if( !slotptr(set->page, slot)->dead ) {
2622                                 ptr = keyptr(set->page, slot);
2623                                 fwrite (ptr->key, ptr->len, 1, stdout);
2624                                 fputc ('\n', stdout);
2625                                 cnt++;
2626                           }
2627
2628                         bt_unlockpage (BtLockRead, set->latch);
2629                         bt_unpinlatch (set->latch);
2630                         bt_unpinpool (set->pool);
2631                 } while( page_no = next );
2632
2633                 fprintf(stderr, " Total keys read %d\n", cnt);
2634                 break;
2635
2636         case 'c':
2637 #ifdef unix
2638                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2639 #endif
2640                 fprintf(stderr, "started counting\n");
2641                 next = bt->mgr->latchmgr->nlatchpage + LATCH_page;
2642                 page_no = LEAF_page;
2643
2644                 while( page_no < bt_getid(bt->mgr->latchmgr->alloc->right) ) {
2645                 uid off = page_no << bt->mgr->page_bits;
2646 #ifdef unix
2647                   pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2648 #else
2649                 DWORD amt[1];
2650
2651                   SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2652
2653                   if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2654                         return bt->err = BTERR_map;
2655
2656                   if( *amt <  bt->mgr->page_size )
2657                         return bt->err = BTERR_map;
2658 #endif
2659                         if( !bt->frame->free && !bt->frame->lvl )
2660                                 cnt += bt->frame->act;
2661                         if( page_no > LEAF_page )
2662                                 next = page_no + 1;
2663                         page_no = next;
2664                 }
2665                 
2666                 cnt--;  // remove stopper key
2667                 fprintf(stderr, " Total keys read %d\n", cnt);
2668                 break;
2669         }
2670
2671         bt_close (bt);
2672 #ifdef unix
2673         return NULL;
2674 #else
2675         return 0;
2676 #endif
2677 }
2678
2679 typedef struct timeval timer;
2680
2681 int main (int argc, char **argv)
2682 {
2683 int idx, cnt, len, slot, err;
2684 int segsize, bits = 16;
2685 double start, stop;
2686 #ifdef unix
2687 pthread_t *threads;
2688 #else
2689 HANDLE *threads;
2690 #endif
2691 ThreadArg *args;
2692 uint poolsize = 0;
2693 float elapsed;
2694 int num = 0;
2695 char key[1];
2696 BtMgr *mgr;
2697 BtKey ptr;
2698 BtDb *bt;
2699
2700         if( argc < 3 ) {
2701                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits mapped_segments seg_bits line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2702                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2703                 fprintf (stderr, "  mapped_segments is the number of mmap segments in buffer pool\n");
2704                 fprintf (stderr, "  seg_bits is the size of individual segments in buffer pool in pages in bits\n");
2705                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2706                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2707                 exit(0);
2708         }
2709
2710         start = getCpuTime(0);
2711
2712         if( argc > 3 )
2713                 bits = atoi(argv[3]);
2714
2715         if( argc > 4 )
2716                 poolsize = atoi(argv[4]);
2717
2718         if( !poolsize )
2719                 fprintf (stderr, "Warning: no mapped_pool\n");
2720
2721         if( poolsize > 65535 )
2722                 fprintf (stderr, "Warning: mapped_pool > 65535 segments\n");
2723
2724         if( argc > 5 )
2725                 segsize = atoi(argv[5]);
2726         else
2727                 segsize = 4;    // 16 pages per mmap segment
2728
2729         if( argc > 6 )
2730                 num = atoi(argv[6]);
2731
2732         cnt = argc - 7;
2733 #ifdef unix
2734         threads = malloc (cnt * sizeof(pthread_t));
2735 #else
2736         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2737 #endif
2738         args = malloc (cnt * sizeof(ThreadArg));
2739
2740         mgr = bt_mgr ((argv[1]), BT_rw, bits, poolsize, segsize, poolsize / 8);
2741
2742         if( !mgr ) {
2743                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2744                 exit (1);
2745         }
2746
2747         //      fire off threads
2748
2749         for( idx = 0; idx < cnt; idx++ ) {
2750                 args[idx].infile = argv[idx + 7];
2751                 args[idx].type = argv[2];
2752                 args[idx].mgr = mgr;
2753                 args[idx].num = num;
2754                 args[idx].idx = idx;
2755 #ifdef unix
2756                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2757                         fprintf(stderr, "Error creating thread %d\n", err);
2758 #else
2759                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2760 #endif
2761         }
2762
2763         //      wait for termination
2764
2765 #ifdef unix
2766         for( idx = 0; idx < cnt; idx++ )
2767                 pthread_join (threads[idx], NULL);
2768 #else
2769         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2770
2771         for( idx = 0; idx < cnt; idx++ )
2772                 CloseHandle(threads[idx]);
2773
2774 #endif
2775         elapsed = getCpuTime(0) - start;
2776         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2777         elapsed = getCpuTime(1);
2778         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2779         elapsed = getCpuTime(2);
2780         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2781
2782         bt_mgrclose (mgr);
2783 }
2784
2785 #endif  //STANDALONE