]> pd.if.org Git - btree/blob - threadskv6.c
Add finalized threadskv10g LSM btree.
[btree] / threadskv6.c
1 // btree version threadskv6 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      and traditional buffer pool manager
8
9 // 10 SEP 2014
10
11 // author: karl malbrain, malbrain@cal.berkeley.edu
12
13 /*
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
16
17 The orginal author is Karl Malbrain.
18
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
25 */
26
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
29
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
32
33 #ifdef linux
34 #define _GNU_SOURCE
35 #endif
36
37 #ifdef unix
38 #include <unistd.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <fcntl.h>
42 #include <sys/time.h>
43 #include <sys/mman.h>
44 #include <errno.h>
45 #include <pthread.h>
46 #else
47 #define WIN32_LEAN_AND_MEAN
48 #include <windows.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51 #include <time.h>
52 #include <fcntl.h>
53 #include <process.h>
54 #include <intrin.h>
55 #endif
56
57 #include <memory.h>
58 #include <string.h>
59 #include <stddef.h>
60
61 typedef unsigned long long      uid;
62
63 #ifndef unix
64 typedef unsigned long long      off64_t;
65 typedef unsigned short          ushort;
66 typedef unsigned int            uint;
67 #endif
68
69 #define BT_ro 0x6f72    // ro
70 #define BT_rw 0x7772    // rw
71
72 #define BT_maxbits              24                                      // maximum page size in bits
73 #define BT_minbits              9                                       // minimum page size in bits
74 #define BT_minpage              (1 << BT_minbits)       // minimum page size
75 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
76
77 //  BTree page number constants
78 #define ALLOC_page              0       // allocation page
79 #define ROOT_page               1       // root of the btree
80 #define LEAF_page               2       // first page of leaves
81
82 //      Number of levels to create in a new BTree
83
84 #define MIN_lvl                 2
85
86 /*
87 There are five lock types for each node in three independent sets: 
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
93 */
94
95 typedef enum{
96         BtLockAccess,
97         BtLockDelete,
98         BtLockRead,
99         BtLockWrite,
100         BtLockParent
101 } BtLock;
102
103 //      definition for phase-fair reader/writer lock implementation
104
105 typedef struct {
106         ushort rin[1];
107         ushort rout[1];
108         ushort ticket[1];
109         ushort serving[1];
110 } RWLock;
111
112 #define PHID 0x1
113 #define PRES 0x2
114 #define MASK 0x3
115 #define RINC 0x4
116
117 //      definition for spin latch implementation
118
119 // exclusive is set for write access
120 // share is count of read accessors
121 // grant write lock when share == 0
122
123 volatile typedef struct {
124         ushort exclusive:1;
125         ushort pending:1;
126         ushort share:14;
127 } BtSpinLatch;
128
129 #define XCL 1
130 #define PEND 2
131 #define BOTH 3
132 #define SHARE 4
133
134 //  hash table entries
135
136 typedef struct {
137         volatile uint slot;             // Latch table entry at head of chain
138         BtSpinLatch latch[1];
139 } BtHashEntry;
140
141 //      latch manager table structure
142
143 typedef struct {
144         volatile uid page_no;   // latch set page number
145         RWLock readwr[1];               // read/write page lock
146         RWLock access[1];               // Access Intent/Page delete
147         RWLock parent[1];               // Posting of fence key in parent
148         uint slot;                              // entry slot in latch table
149         uint next;                              // next entry in hash table chain
150         uint prev;                              // prev entry in hash table chain
151         volatile ushort pin;    // number of outstanding threads
152         ushort dirty:1;                 // page in cache is dirty
153 } BtLatchSet;
154
155 //      Define the length of the page record numbers
156
157 #define BtId 6
158
159 //      Page key slot definition.
160
161 //      Keys are marked dead, but remain on the page until
162 //      it cleanup is called. The fence key (highest key) for
163 //      a leaf page is always present, even after cleanup.
164
165 //      Slot types
166
167 //      In addition to the Unique keys that occupy slots
168 //      there are Librarian and Duplicate key
169 //      slots occupying the key slot array.
170
171 //      The Librarian slots are dead keys that
172 //      serve as filler, available to add new Unique
173 //      or Dup slots that are inserted into the B-tree.
174
175 //      The Duplicate slots have had their key bytes extended
176 //      by 6 bytes to contain a binary duplicate key uniqueifier.
177
178 typedef enum {
179         Unique,
180         Librarian,
181         Duplicate
182 } BtSlotType;
183
184 typedef struct {
185         uint off:BT_maxbits;    // page offset for key start
186         uint type:3;                    // type of slot
187         uint dead:1;                    // set for deleted slot
188 } BtSlot;
189
190 //      The key structure occupies space at the upper end of
191 //      each page.  It's a length byte followed by the key
192 //      bytes.
193
194 typedef struct {
195         unsigned char len;              // this can be changed to a ushort or uint
196         unsigned char key[0];
197 } BtKey;
198
199 //      the value structure also occupies space at the upper
200 //      end of the page. Each key is immediately followed by a value.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char value[0];
205 } BtVal;
206
207 #define BT_maxkey       255             // maximum number of bytes in a key
208 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
209
210 //      The first part of an index page.
211 //      It is immediately followed
212 //      by the BtSlot array of keys.
213
214 //      note that this structure size
215 //      must be a multiple of 8 bytes
216 //      in order to place dups correctly.
217
218 typedef struct BtPage_ {
219         uint cnt;                                       // count of keys in page
220         uint act;                                       // count of active keys
221         uint min;                                       // next key offset
222         uint garbage;                           // page garbage in bytes
223         unsigned char bits:7;           // page size in bits
224         unsigned char free:1;           // page is on free chain
225         unsigned char lvl:7;            // level of page
226         unsigned char kill:1;           // page is being deleted
227         unsigned char left[BtId];       // page number to left
228         unsigned char filler[2];        // padding to multiple of 8
229         unsigned char right[BtId];      // page number to right
230 } *BtPage;
231
232 //  The loadpage interface object
233
234 typedef struct {
235         uid page_no;            // current page number
236         BtPage page;            // current page pointer
237         BtLatchSet *latch;      // current page latch set
238 } BtPageSet;
239
240 //      structure for latch manager on ALLOC_page
241
242 typedef struct {
243         struct BtPage_ alloc[1];        // next page_no in right ptr
244         unsigned long long dups[1];     // global duplicate key uniqueifier
245         unsigned char chain[BtId];      // head of free page_nos chain
246 } BtPageZero;
247
248 //      The object structure for Btree access
249
250 typedef struct {
251         uint page_size;                         // page size    
252         uint page_bits;                         // page size in bits    
253 #ifdef unix
254         int idx;
255 #else
256         HANDLE idx;
257 #endif
258         BtPageZero *pagezero;           // mapped allocation page
259         BtSpinLatch lock[1];            // allocation area lite latch
260         uint latchdeployed;                     // highest number of latch entries deployed
261         uint nlatchpage;                        // number of latch pages at BT_latch
262         uint latchtotal;                        // number of page latch entries
263         uint latchhash;                         // number of latch hash table slots
264         uint latchvictim;                       // next latch entry to examine
265         BtHashEntry *hashtable;         // the anonymous mapping buffer pool
266         BtLatchSet *latchsets;          // mapped latch set from latch pages
267         unsigned char *pagepool;        // mapped to the buffer pool pages
268 #ifndef unix
269         HANDLE halloc;                          // allocation handle
270         HANDLE hpool;                           // buffer pool handle
271 #endif
272 } BtMgr;
273
274 typedef struct {
275         BtMgr *mgr;                             // buffer manager for thread
276         BtPage cursor;                  // cached frame for start/next (never mapped)
277         BtPage frame;                   // spare frame for the page split (never mapped)
278         uid cursor_page;                                // current cursor page number   
279         unsigned char *mem;                             // frame, cursor, page memory buffer
280         unsigned char key[BT_keyarray]; // last found complete key
281         int found;                              // last delete or insert was found
282         int err;                                // last error
283         int reads, writes;              // number of reads and writes from the btree
284 } BtDb;
285
286 typedef enum {
287         BTERR_ok = 0,
288         BTERR_struct,
289         BTERR_ovflw,
290         BTERR_lock,
291         BTERR_map,
292         BTERR_read,
293         BTERR_wrt,
294         BTERR_hash
295 } BTERR;
296
297 #define CLOCK_bit 0x8000
298
299 // B-Tree functions
300 extern void bt_close (BtDb *bt);
301 extern BtDb *bt_open (BtMgr *mgr);
302 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
303 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
304 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
305 extern BtKey *bt_foundkey (BtDb *bt);
306 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
307 extern uint bt_nextkey   (BtDb *bt, uint slot);
308
309 //      manager functions
310 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
311 void bt_mgrclose (BtMgr *mgr);
312
313 //  Helper functions to return slot values
314 //      from the cursor page.
315
316 extern BtKey *bt_key (BtDb *bt, uint slot);
317 extern BtVal *bt_val (BtDb *bt, uint slot);
318
319 //  The page is allocated from low and hi ends.
320 //  The key slots are allocated from the bottom,
321 //      while the text and value of the key
322 //  are allocated from the top.  When the two
323 //  areas meet, the page is split into two.
324
325 //  A key consists of a length byte, two bytes of
326 //  index number (0 - 65535), and up to 253 bytes
327 //  of key value.
328
329 //  Associated with each key is a value byte string
330 //      containing any value desired.
331
332 //  The b-tree root is always located at page 1.
333 //      The first leaf page of level zero is always
334 //      located on page 2.
335
336 //      The b-tree pages are linked with next
337 //      pointers to facilitate enumerators,
338 //      and provide for concurrency.
339
340 //      When to root page fills, it is split in two and
341 //      the tree height is raised by a new root at page
342 //      one with two keys.
343
344 //      Deleted keys are marked with a dead bit until
345 //      page cleanup. The fence key for a leaf node is
346 //      always present
347
348 //  To achieve maximum concurrency one page is locked at a time
349 //  as the tree is traversed to find leaf key in question. The right
350 //  page numbers are used in cases where the page is being split,
351 //      or consolidated.
352
353 //  Page 0 is dedicated to lock for new page extensions,
354 //      and chains empty pages together for reuse. It also
355 //      contains the latch manager hash table.
356
357 //      The ParentModification lock on a node is obtained to serialize posting
358 //      or changing the fence key for a node.
359
360 //      Empty pages are chained together through the ALLOC page and reused.
361
362 //      Access macros to address slot and key values from the page
363 //      Page slots use 1 based indexing.
364
365 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
366 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
367 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
368
369 void bt_putid(unsigned char *dest, uid id)
370 {
371 int i = BtId;
372
373         while( i-- )
374                 dest[i] = (unsigned char)id, id >>= 8;
375 }
376
377 uid bt_getid(unsigned char *src)
378 {
379 uid id = 0;
380 int i;
381
382         for( i = 0; i < BtId; i++ )
383                 id <<= 8, id |= *src++; 
384
385         return id;
386 }
387
388 uid bt_newdup (BtDb *bt)
389 {
390 #ifdef unix
391         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
392 #else
393         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
394 #endif
395 }
396
397 //      Phase-Fair reader/writer lock implementation
398
399 void WriteLock (RWLock *lock)
400 {
401 ushort w, r, tix;
402
403 #ifdef unix
404         tix = __sync_fetch_and_add (lock->ticket, 1);
405 #else
406         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
407 #endif
408         // wait for our ticket to come up
409
410         while( tix != lock->serving[0] )
411 #ifdef unix
412                 sched_yield();
413 #else
414                 SwitchToThread ();
415 #endif
416
417         w = PRES | (tix & PHID);
418 #ifdef  unix
419         r = __sync_fetch_and_add (lock->rin, w);
420 #else
421         r = _InterlockedExchangeAdd16 (lock->rin, w);
422 #endif
423         while( r != *lock->rout )
424 #ifdef unix
425                 sched_yield();
426 #else
427                 SwitchToThread();
428 #endif
429 }
430
431 void WriteRelease (RWLock *lock)
432 {
433 #ifdef unix
434         __sync_fetch_and_and (lock->rin, ~MASK);
435 #else
436         _InterlockedAnd16 (lock->rin, ~MASK);
437 #endif
438         lock->serving[0]++;
439 }
440
441 void ReadLock (RWLock *lock)
442 {
443 ushort w;
444 #ifdef unix
445         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
446 #else
447         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
448 #endif
449         if( w )
450           while( w == (*lock->rin & MASK) )
451 #ifdef unix
452                 sched_yield ();
453 #else
454                 SwitchToThread ();
455 #endif
456 }
457
458 void ReadRelease (RWLock *lock)
459 {
460 #ifdef unix
461         __sync_fetch_and_add (lock->rout, RINC);
462 #else
463         _InterlockedExchangeAdd16 (lock->rout, RINC);
464 #endif
465 }
466
467 //      Spin Latch Manager
468
469 //      wait until write lock mode is clear
470 //      and add 1 to the share count
471
472 void bt_spinreadlock(BtSpinLatch *latch)
473 {
474 ushort prev;
475
476   do {
477 #ifdef unix
478         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
479 #else
480         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
481 #endif
482         //  see if exclusive request is granted or pending
483
484         if( !(prev & BOTH) )
485                 return;
486 #ifdef unix
487         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
488 #else
489         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
490 #endif
491 #ifdef  unix
492   } while( sched_yield(), 1 );
493 #else
494   } while( SwitchToThread(), 1 );
495 #endif
496 }
497
498 //      wait for other read and write latches to relinquish
499
500 void bt_spinwritelock(BtSpinLatch *latch)
501 {
502 ushort prev;
503
504   do {
505 #ifdef  unix
506         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
507 #else
508         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
509 #endif
510         if( !(prev & XCL) )
511           if( !(prev & ~BOTH) )
512                 return;
513           else
514 #ifdef unix
515                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
516 #else
517                 _InterlockedAnd16((ushort *)latch, ~XCL);
518 #endif
519 #ifdef  unix
520   } while( sched_yield(), 1 );
521 #else
522   } while( SwitchToThread(), 1 );
523 #endif
524 }
525
526 //      try to obtain write lock
527
528 //      return 1 if obtained,
529 //              0 otherwise
530
531 int bt_spinwritetry(BtSpinLatch *latch)
532 {
533 ushort prev;
534
535 #ifdef  unix
536         prev = __sync_fetch_and_or((ushort *)latch, XCL);
537 #else
538         prev = _InterlockedOr16((ushort *)latch, XCL);
539 #endif
540         //      take write access if all bits are clear
541
542         if( !(prev & XCL) )
543           if( !(prev & ~BOTH) )
544                 return 1;
545           else
546 #ifdef unix
547                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
548 #else
549                 _InterlockedAnd16((ushort *)latch, ~XCL);
550 #endif
551         return 0;
552 }
553
554 //      clear write mode
555
556 void bt_spinreleasewrite(BtSpinLatch *latch)
557 {
558 #ifdef unix
559         __sync_fetch_and_and((ushort *)latch, ~BOTH);
560 #else
561         _InterlockedAnd16((ushort *)latch, ~BOTH);
562 #endif
563 }
564
565 //      decrement reader count
566
567 void bt_spinreleaseread(BtSpinLatch *latch)
568 {
569 #ifdef unix
570         __sync_fetch_and_add((ushort *)latch, -SHARE);
571 #else
572         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
573 #endif
574 }
575
576 //      read page from permanent location in Btree file
577
578 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
579 {
580 off64_t off = page_no << mgr->page_bits;
581
582 #ifdef unix
583         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
584                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
585                 return BTERR_read;
586         }
587 #else
588 OVERLAPPED ovl[1];
589 uint amt[1];
590
591         memset (ovl, 0, sizeof(OVERLAPPED));
592         ovl->Offset = off;
593         ovl->OffsetHigh = off >> 32;
594
595         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
596                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
597                 return BTERR_read;
598         }
599         if( *amt <  mgr->page_size ) {
600                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
601                 return BTERR_read;
602         }
603 #endif
604         return 0;
605 }
606
607 //      write page to permanent location in Btree file
608 //      clear the dirty bit
609
610 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
611 {
612 off64_t off = page_no << mgr->page_bits;
613
614 #ifdef unix
615         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
616                 return BTERR_wrt;
617 #else
618 OVERLAPPED ovl[1];
619 uint amt[1];
620
621         memset (ovl, 0, sizeof(OVERLAPPED));
622         ovl->Offset = off;
623         ovl->OffsetHigh = off >> 32;
624
625         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
626                 return BTERR_wrt;
627
628         if( *amt <  mgr->page_size )
629                 return BTERR_wrt;
630 #endif
631         return 0;
632 }
633
634 //      link latch table entry into head of latch hash table
635
636 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
637 {
638 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
639 BtLatchSet *latch = bt->mgr->latchsets + slot;
640
641         if( latch->next = bt->mgr->hashtable[hashidx].slot )
642                 bt->mgr->latchsets[latch->next].prev = slot;
643
644         bt->mgr->hashtable[hashidx].slot = slot;
645         latch->page_no = page_no;
646         latch->slot = slot;
647         latch->prev = 0;
648         latch->pin = 1;
649
650         if( loadit )
651           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
652                 return bt->err;
653           else
654                 bt->reads++;
655
656         return bt->err = 0;
657 }
658
659 //      set CLOCK bit in latch
660 //      decrement pin count
661
662 void bt_unpinlatch (BtLatchSet *latch)
663 {
664 #ifdef unix
665         if( ~latch->pin & CLOCK_bit )
666                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
667         __sync_fetch_and_add(&latch->pin, -1);
668 #else
669         if( ~latch->pin & CLOCK_bit )
670                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
671         _InterlockedDecrement16 (&latch->pin);
672 #endif
673 }
674
675 //  return the btree cached page address
676
677 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
678 {
679 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
680
681         return page;
682 }
683
684 //      find existing latchset or inspire new one
685 //      return with latchset pinned
686
687 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
688 {
689 uint hashidx = page_no % bt->mgr->latchhash;
690 BtLatchSet *latch;
691 uint slot, idx;
692 uint lvl, cnt;
693 off64_t off;
694 uint amt[1];
695 BtPage page;
696
697   //  try to find our entry
698
699   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
700
701   if( slot = bt->mgr->hashtable[hashidx].slot ) do
702   {
703         latch = bt->mgr->latchsets + slot;
704         if( page_no == latch->page_no )
705                 break;
706   } while( slot = latch->next );
707
708   //  found our entry
709   //  increment clock
710
711   if( slot ) {
712         latch = bt->mgr->latchsets + slot;
713 #ifdef unix
714         __sync_fetch_and_add(&latch->pin, 1);
715 #else
716         _InterlockedIncrement16 (&latch->pin);
717 #endif
718         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
719         return latch;
720   }
721
722         //  see if there are any unused pool entries
723 #ifdef unix
724         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
725 #else
726         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
727 #endif
728
729         if( slot < bt->mgr->latchtotal ) {
730                 latch = bt->mgr->latchsets + slot;
731                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
732                         return NULL;
733                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
734                 return latch;
735         }
736
737 #ifdef unix
738         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
739 #else
740         _InterlockedDecrement (&bt->mgr->latchdeployed);
741 #endif
742   //  find and reuse previous entry on victim
743
744   while( 1 ) {
745 #ifdef unix
746         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
747 #else
748         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
749 #endif
750         // try to get write lock on hash chain
751         //      skip entry if not obtained
752         //      or has outstanding pins
753
754         slot %= bt->mgr->latchtotal;
755
756         if( !slot )
757                 continue;
758
759         latch = bt->mgr->latchsets + slot;
760         idx = latch->page_no % bt->mgr->latchhash;
761
762         //      see we are on same chain as hashidx
763
764         if( idx == hashidx )
765                 continue;
766
767         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
768                 continue;
769
770         //  skip this slot if it is pinned
771         //      or the CLOCK bit is set
772
773         if( latch->pin ) {
774           if( latch->pin & CLOCK_bit ) {
775 #ifdef unix
776                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
777 #else
778                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
779 #endif
780           }
781           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
782           continue;
783         }
784
785         //  update permanent page area in btree from buffer pool
786
787         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
788
789         if( latch->dirty )
790           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
791                 return NULL;
792           else
793                 latch->dirty = 0, bt->writes++;
794
795         //  unlink our available slot from its hash chain
796
797         if( latch->prev )
798                 bt->mgr->latchsets[latch->prev].next = latch->next;
799         else
800                 bt->mgr->hashtable[idx].slot = latch->next;
801
802         if( latch->next )
803                 bt->mgr->latchsets[latch->next].prev = latch->prev;
804
805         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
806
807         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
808                 return NULL;
809
810         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
811         return latch;
812   }
813 }
814
815 void bt_mgrclose (BtMgr *mgr)
816 {
817 BtLatchSet *latch;
818 uint num = 0;
819 BtPage page;
820 uint slot;
821
822         //      flush dirty pool pages to the btree
823
824         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
825                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
826                 latch = mgr->latchsets + slot;
827
828                 if( latch->dirty ) {
829                         bt_writepage(mgr, page, latch->page_no);
830                         latch->dirty = 0, num++;
831                 }
832 //              madvise (page, mgr->page_size, MADV_DONTNEED);
833         }
834
835         fprintf(stderr, "%d buffer pool pages flushed\n", num);
836
837 #ifdef unix
838         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
839         munmap (mgr->pagezero, mgr->page_size);
840 #else
841         FlushViewOfFile(mgr->pagezero, 0);
842         UnmapViewOfFile(mgr->pagezero);
843         UnmapViewOfFile(mgr->hashtable);
844         CloseHandle(mgr->halloc);
845         CloseHandle(mgr->hpool);
846 #endif
847 #ifdef unix
848         close (mgr->idx);
849         free (mgr);
850 #else
851         FlushFileBuffers(mgr->idx);
852         CloseHandle(mgr->idx);
853         GlobalFree (mgr);
854 #endif
855 }
856
857 //      close and release memory
858
859 void bt_close (BtDb *bt)
860 {
861 #ifdef unix
862         if( bt->mem )
863                 free (bt->mem);
864 #else
865         if( bt->mem)
866                 VirtualFree (bt->mem, 0, MEM_RELEASE);
867 #endif
868         free (bt);
869 }
870
871 //  open/create new btree buffer manager
872
873 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
874 //              size of page pool (e.g. 262144)
875
876 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
877 {
878 uint lvl, attr, last, slot, idx;
879 uint nlatchpage, latchhash;
880 unsigned char value[BtId];
881 int flag, initit = 0;
882 BtPageZero *pagezero;
883 off64_t size;
884 uint amt[1];
885 BtMgr* mgr;
886 BtKey* key;
887 BtVal *val;
888
889         // determine sanity of page size and buffer pool
890
891         if( bits > BT_maxbits )
892                 bits = BT_maxbits;
893         else if( bits < BT_minbits )
894                 bits = BT_minbits;
895
896         if( nodemax < 16 ) {
897                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
898                 return NULL;
899         }
900
901 #ifdef unix
902         mgr = calloc (1, sizeof(BtMgr));
903
904         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
905
906         if( mgr->idx == -1 ) {
907                 fprintf (stderr, "Unable to open btree file\n");
908                 return free(mgr), NULL;
909         }
910 #else
911         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
912         attr = FILE_ATTRIBUTE_NORMAL;
913         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
914
915         if( mgr->idx == INVALID_HANDLE_VALUE )
916                 return GlobalFree(mgr), NULL;
917 #endif
918
919 #ifdef unix
920         pagezero = valloc (BT_maxpage);
921         *amt = 0;
922
923         // read minimum page size to get root info
924         //      to support raw disk partition files
925         //      check if bits == 0 on the disk.
926
927         if( size = lseek (mgr->idx, 0L, 2) )
928                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
929                         if( pagezero->alloc->bits )
930                                 bits = pagezero->alloc->bits;
931                         else
932                                 initit = 1;
933                 else
934                         return free(mgr), free(pagezero), NULL;
935         else
936                 initit = 1;
937 #else
938         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
939         size = GetFileSize(mgr->idx, amt);
940
941         if( size || *amt ) {
942                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
943                         return bt_mgrclose (mgr), NULL;
944                 bits = pagezero->alloc->bits;
945         } else
946                 initit = 1;
947 #endif
948
949         mgr->page_size = 1 << bits;
950         mgr->page_bits = bits;
951
952         //  calculate number of latch hash table entries
953
954         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
955         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
956
957         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
958         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
959         mgr->latchtotal = nodemax;
960
961         if( !initit )
962                 goto mgrlatch;
963
964         // initialize an empty b-tree with latch page, root page, page of leaves
965         // and page(s) of latches and page pool cache
966
967         memset (pagezero, 0, 1 << bits);
968         pagezero->alloc->bits = mgr->page_bits;
969         bt_putid(pagezero->alloc->right, MIN_lvl+1);
970
971         //  initialize left-most LEAF page in
972         //      alloc->left.
973
974         bt_putid (pagezero->alloc->left, LEAF_page);
975
976         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
977                 fprintf (stderr, "Unable to create btree page zero\n");
978                 return bt_mgrclose (mgr), NULL;
979         }
980
981         memset (pagezero, 0, 1 << bits);
982         pagezero->alloc->bits = mgr->page_bits;
983
984         for( lvl=MIN_lvl; lvl--; ) {
985                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
986                 key = keyptr(pagezero->alloc, 1);
987                 key->len = 2;           // create stopper key
988                 key->key[0] = 0xff;
989                 key->key[1] = 0xff;
990
991                 bt_putid(value, MIN_lvl - lvl + 1);
992                 val = valptr(pagezero->alloc, 1);
993                 val->len = lvl ? BtId : 0;
994                 memcpy (val->value, value, val->len);
995
996                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
997                 pagezero->alloc->lvl = lvl;
998                 pagezero->alloc->cnt = 1;
999                 pagezero->alloc->act = 1;
1000
1001                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1002                         fprintf (stderr, "Unable to create btree page zero\n");
1003                         return bt_mgrclose (mgr), NULL;
1004                 }
1005         }
1006
1007 mgrlatch:
1008 #ifdef unix
1009         free (pagezero);
1010 #else
1011         VirtualFree (pagezero, 0, MEM_RELEASE);
1012 #endif
1013 #ifdef unix
1014         // mlock the pagezero page
1015
1016         flag = PROT_READ | PROT_WRITE;
1017         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1018         if( mgr->pagezero == MAP_FAILED ) {
1019                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1020                 return bt_mgrclose (mgr), NULL;
1021         }
1022         mlock (mgr->pagezero, mgr->page_size);
1023
1024         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1025         if( mgr->hashtable == MAP_FAILED ) {
1026                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1027                 return bt_mgrclose (mgr), NULL;
1028         }
1029 #else
1030         flag = PAGE_READWRITE;
1031         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1032         if( !mgr->halloc ) {
1033                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1034                 return bt_mgrclose (mgr), NULL;
1035         }
1036
1037         flag = FILE_MAP_WRITE;
1038         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1039         if( !mgr->pagezero ) {
1040                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1041                 return bt_mgrclose (mgr), NULL;
1042         }
1043
1044         flag = PAGE_READWRITE;
1045         size = (uid)mgr->nlatchpage << mgr->page_bits;
1046         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1047         if( !mgr->hpool ) {
1048                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1049                 return bt_mgrclose (mgr), NULL;
1050         }
1051
1052         flag = FILE_MAP_WRITE;
1053         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1054         if( !mgr->hashtable ) {
1055                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1056                 return bt_mgrclose (mgr), NULL;
1057         }
1058 #endif
1059
1060         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1061         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1062
1063         return mgr;
1064 }
1065
1066 //      open BTree access method
1067 //      based on buffer manager
1068
1069 BtDb *bt_open (BtMgr *mgr)
1070 {
1071 BtDb *bt = malloc (sizeof(*bt));
1072
1073         memset (bt, 0, sizeof(*bt));
1074         bt->mgr = mgr;
1075 #ifdef unix
1076         bt->mem = valloc (2 *mgr->page_size);
1077 #else
1078         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1079 #endif
1080         bt->frame = (BtPage)bt->mem;
1081         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1082         return bt;
1083 }
1084
1085 //  compare two keys, returning > 0, = 0, or < 0
1086 //  as the comparison value
1087
1088 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1089 {
1090 uint len1 = key1->len;
1091 int ans;
1092
1093         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1094                 return ans;
1095
1096         if( len1 > len2 )
1097                 return 1;
1098         if( len1 < len2 )
1099                 return -1;
1100
1101         return 0;
1102 }
1103
1104 // place write, read, or parent lock on requested page_no.
1105
1106 void bt_lockpage(BtLock mode, BtLatchSet *set)
1107 {
1108         switch( mode ) {
1109         case BtLockRead:
1110                 ReadLock (set->readwr);
1111                 break;
1112         case BtLockWrite:
1113                 WriteLock (set->readwr);
1114                 break;
1115         case BtLockAccess:
1116                 ReadLock (set->access);
1117                 break;
1118         case BtLockDelete:
1119                 WriteLock (set->access);
1120                 break;
1121         case BtLockParent:
1122                 WriteLock (set->parent);
1123                 break;
1124         }
1125 }
1126
1127 // remove write, read, or parent lock on requested page
1128
1129 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1130 {
1131         switch( mode ) {
1132         case BtLockRead:
1133                 ReadRelease (set->readwr);
1134                 break;
1135         case BtLockWrite:
1136                 WriteRelease (set->readwr);
1137                 break;
1138         case BtLockAccess:
1139                 ReadRelease (set->access);
1140                 break;
1141         case BtLockDelete:
1142                 WriteRelease (set->access);
1143                 break;
1144         case BtLockParent:
1145                 WriteRelease (set->parent);
1146                 break;
1147         }
1148 }
1149
1150 //      allocate a new page
1151 //      return with page latched.
1152
1153 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1154 {
1155 int blk;
1156
1157         //      lock allocation page
1158
1159         bt_spinwritelock(bt->mgr->lock);
1160
1161         // use empty chain first
1162         // else allocate empty page
1163
1164         if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1165                 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1166                         set->page = bt_mappage (bt, set->latch);
1167                 else
1168                         return bt->err = BTERR_struct, -1;
1169
1170                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1171                 bt_spinreleasewrite(bt->mgr->lock);
1172
1173                 memcpy (set->page, contents, bt->mgr->page_size);
1174                 set->latch->dirty = 1;
1175                 return 0;
1176         }
1177
1178         set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1179         bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1180
1181         // unlock allocation latch
1182
1183         bt_spinreleasewrite(bt->mgr->lock);
1184
1185         //      don't load cache from btree page
1186
1187         if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1188                 set->page = bt_mappage (bt, set->latch);
1189         else
1190                 return bt->err = BTERR_struct;
1191
1192         memcpy (set->page, contents, bt->mgr->page_size);
1193         set->latch->dirty = 1;
1194         return 0;
1195 }
1196
1197 //  find slot in page for given key at a given level
1198
1199 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1200 {
1201 uint diff, higher = set->page->cnt, low = 1, slot;
1202 uint good = 0;
1203
1204         //        make stopper key an infinite fence value
1205
1206         if( bt_getid (set->page->right) )
1207                 higher++;
1208         else
1209                 good++;
1210
1211         //      low is the lowest candidate.
1212         //  loop ends when they meet
1213
1214         //  higher is already
1215         //      tested as .ge. the passed key.
1216
1217         while( diff = higher - low ) {
1218                 slot = low + ( diff >> 1 );
1219                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1220                         low = slot + 1;
1221                 else
1222                         higher = slot, good++;
1223         }
1224
1225         //      return zero if key is on right link page
1226
1227         return good ? higher : 0;
1228 }
1229
1230 //  find and load page at given level for given key
1231 //      leave page rd or wr locked as requested
1232
1233 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1234 {
1235 uid page_no = ROOT_page, prevpage = 0;
1236 uint drill = 0xff, slot;
1237 BtLatchSet *prevlatch;
1238 uint mode, prevmode;
1239
1240   //  start at root of btree and drill down
1241
1242   do {
1243         // determine lock mode of drill level
1244         mode = (drill == lvl) ? lock : BtLockRead; 
1245
1246         if( set->latch = bt_pinlatch (bt, page_no, 1) )
1247                 set->page_no = page_no;
1248         else
1249                 return 0;
1250
1251         // obtain access lock using lock chaining with Access mode
1252
1253         if( page_no > ROOT_page )
1254           bt_lockpage(BtLockAccess, set->latch);
1255
1256         set->page = bt_mappage (bt, set->latch);
1257
1258         //      release & unpin parent page
1259
1260         if( prevpage ) {
1261           bt_unlockpage(prevmode, prevlatch);
1262           bt_unpinlatch (prevlatch);
1263           prevpage = 0;
1264         }
1265
1266         // obtain read lock using lock chaining
1267
1268         bt_lockpage(mode, set->latch);
1269
1270         if( set->page->free )
1271                 return bt->err = BTERR_struct, 0;
1272
1273         if( page_no > ROOT_page )
1274           bt_unlockpage(BtLockAccess, set->latch);
1275
1276         // re-read and re-lock root after determining actual level of root
1277
1278         if( set->page->lvl != drill) {
1279                 if( set->page_no != ROOT_page )
1280                         return bt->err = BTERR_struct, 0;
1281                         
1282                 drill = set->page->lvl;
1283
1284                 if( lock != BtLockRead && drill == lvl ) {
1285                   bt_unlockpage(mode, set->latch);
1286                   bt_unpinlatch (set->latch);
1287                   continue;
1288                 }
1289         }
1290
1291         prevpage = set->page_no;
1292         prevlatch = set->latch;
1293         prevmode = mode;
1294
1295         //  find key on page at this level
1296         //  and descend to requested level
1297
1298         if( set->page->kill )
1299           goto slideright;
1300
1301         if( slot = bt_findslot (set, key, len) ) {
1302           if( drill == lvl )
1303                 return slot;
1304
1305           while( slotptr(set->page, slot)->dead )
1306                 if( slot++ < set->page->cnt )
1307                         continue;
1308                 else
1309                         goto slideright;
1310
1311           page_no = bt_getid(valptr(set->page, slot)->value);
1312           drill--;
1313           continue;
1314         }
1315
1316         //  or slide right into next page
1317
1318 slideright:
1319         page_no = bt_getid(set->page->right);
1320
1321   } while( page_no );
1322
1323   // return error on end of right chain
1324
1325   bt->err = BTERR_struct;
1326   return 0;     // return error
1327 }
1328
1329 //      return page to free list
1330 //      page must be delete & write locked
1331
1332 void bt_freepage (BtDb *bt, BtPageSet *set)
1333 {
1334         //      lock allocation page
1335
1336         bt_spinwritelock (bt->mgr->lock);
1337
1338         //      store chain
1339         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1340         bt_putid(bt->mgr->pagezero->chain, set->page_no);
1341         set->latch->dirty = 1;
1342         set->page->free = 1;
1343
1344         // unlock released page
1345
1346         bt_unlockpage (BtLockDelete, set->latch);
1347         bt_unlockpage (BtLockWrite, set->latch);
1348         bt_unpinlatch (set->latch);
1349
1350         // unlock allocation page
1351
1352         bt_spinreleasewrite (bt->mgr->lock);
1353 }
1354
1355 //      a fence key was deleted from a page
1356 //      push new fence value upwards
1357
1358 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1359 {
1360 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1361 unsigned char value[BtId];
1362 BtKey* ptr;
1363 uint idx;
1364
1365         //      remove the old fence value
1366
1367         ptr = keyptr(set->page, set->page->cnt);
1368         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1369         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1370         set->latch->dirty = 1;
1371
1372         //  cache new fence value
1373
1374         ptr = keyptr(set->page, set->page->cnt);
1375         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1376
1377         bt_lockpage (BtLockParent, set->latch);
1378         bt_unlockpage (BtLockWrite, set->latch);
1379
1380         //      insert new (now smaller) fence key
1381
1382         bt_putid (value, set->page_no);
1383         ptr = (BtKey*)leftkey;
1384
1385         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1386           return bt->err;
1387
1388         //      now delete old fence key
1389
1390         ptr = (BtKey*)rightkey;
1391
1392         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1393                 return bt->err;
1394
1395         bt_unlockpage (BtLockParent, set->latch);
1396         bt_unpinlatch(set->latch);
1397         return 0;
1398 }
1399
1400 //      root has a single child
1401 //      collapse a level from the tree
1402
1403 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1404 {
1405 BtPageSet child[1];
1406 uint idx;
1407
1408   // find the child entry and promote as new root contents
1409
1410   do {
1411         for( idx = 0; idx++ < root->page->cnt; )
1412           if( !slotptr(root->page, idx)->dead )
1413                 break;
1414
1415         child->page_no = bt_getid (valptr(root->page, idx)->value);
1416
1417         if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1418                 child->page = bt_mappage (bt, child->latch);
1419         else
1420                 return bt->err;
1421
1422         bt_lockpage (BtLockDelete, child->latch);
1423         bt_lockpage (BtLockWrite, child->latch);
1424
1425         memcpy (root->page, child->page, bt->mgr->page_size);
1426         root->latch->dirty = 1;
1427
1428         bt_freepage (bt, child);
1429
1430   } while( root->page->lvl > 1 && root->page->act == 1 );
1431
1432   bt_unlockpage (BtLockWrite, root->latch);
1433   bt_unpinlatch (root->latch);
1434   return 0;
1435 }
1436
1437 //      maintain reverse scan pointers by
1438 //      linking left pointer of far right node
1439
1440 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1441 {
1442 BtPageSet right2[1];
1443
1444         //      keep track of rightmost leaf page
1445
1446         if( !right_page_no ) {
1447           bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1448           return 0;
1449         }
1450
1451         //      link right page to left page
1452
1453         if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1454                 right2->page = bt_mappage (bt, right2->latch);
1455         else
1456                 return bt->err;
1457
1458         bt_lockpage (BtLockWrite, right2->latch);
1459
1460         bt_putid(right2->page->left, left_page_no);
1461         bt_unlockpage (BtLockWrite, right2->latch);
1462         bt_unpinlatch (right2->latch);
1463         return 0;
1464 }
1465
1466 //  find and delete key on page by marking delete flag bit
1467 //  if page becomes empty, delete it from the btree
1468
1469 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1470 {
1471 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1472 uint slot, idx, found, fence;
1473 BtPageSet set[1], right[1];
1474 unsigned char value[BtId];
1475 BtKey *ptr, *tst;
1476 BtVal *val;
1477
1478         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1479                 ptr = keyptr(set->page, slot);
1480         else
1481                 return bt->err;
1482
1483         // if librarian slot, advance to real slot
1484
1485         if( slotptr(set->page, slot)->type == Librarian )
1486                 ptr = keyptr(set->page, ++slot);
1487
1488         fence = slot == set->page->cnt;
1489
1490         // if key is found delete it, otherwise ignore request
1491
1492         if( found = !keycmp (ptr, key, len) )
1493           if( found = slotptr(set->page, slot)->dead == 0 ) {
1494                 val = valptr(set->page,slot);
1495                 slotptr(set->page, slot)->dead = 1;
1496                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1497                 set->page->act--;
1498
1499                 // collapse empty slots beneath the fence
1500
1501                 while( idx = set->page->cnt - 1 )
1502                   if( slotptr(set->page, idx)->dead ) {
1503                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1504                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1505                   } else
1506                         break;
1507           }
1508
1509         //      did we delete a fence key in an upper level?
1510
1511         if( found && lvl && set->page->act && fence )
1512           if( bt_fixfence (bt, set, lvl) )
1513                 return bt->err;
1514           else
1515                 return bt->found = found, 0;
1516
1517         //      do we need to collapse root?
1518
1519         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1520           if( bt_collapseroot (bt, set) )
1521                 return bt->err;
1522           else
1523                 return bt->found = found, 0;
1524
1525         //      return if page is not empty
1526
1527         if( set->page->act ) {
1528                 set->latch->dirty = 1;
1529                 bt_unlockpage(BtLockWrite, set->latch);
1530                 bt_unpinlatch (set->latch);
1531                 return bt->found = found, 0;
1532         }
1533
1534         //      cache copy of fence key
1535         //      to post in parent
1536
1537         ptr = keyptr(set->page, set->page->cnt);
1538         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1539
1540         //      obtain lock on right page
1541
1542         right->page_no = bt_getid(set->page->right);
1543
1544         if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1545                 right->page = bt_mappage (bt, right->latch);
1546         else
1547                 return 0;
1548
1549         bt_lockpage (BtLockWrite, right->latch);
1550
1551         if( right->page->kill )
1552                 return bt->err = BTERR_struct;
1553
1554         // transfer left link
1555
1556         memcpy (right->page->left, set->page->left, BtId);
1557
1558         // pull contents of right peer into our empty page
1559
1560         memcpy (set->page, right->page, bt->mgr->page_size);
1561         set->latch->dirty = 1;
1562
1563         // update left link
1564
1565         if( !lvl )
1566           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1567                 return bt->err;
1568
1569         // cache copy of key to update
1570
1571         ptr = keyptr(right->page, right->page->cnt);
1572         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1573
1574         // mark right page deleted and point it to left page
1575         //      until we can post parent updates
1576
1577         bt_putid (right->page->right, set->page_no);
1578         right->latch->dirty = 1;
1579         right->page->kill = 1;
1580
1581         bt_lockpage (BtLockParent, right->latch);
1582         bt_unlockpage (BtLockWrite, right->latch);
1583
1584         bt_lockpage (BtLockParent, set->latch);
1585         bt_unlockpage (BtLockWrite, set->latch);
1586
1587         // redirect higher key directly to our new node contents
1588
1589         bt_putid (value, set->page_no);
1590         ptr = (BtKey*)higherfence;
1591
1592         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1593           return bt->err;
1594
1595         //      delete old lower key to our node
1596
1597         ptr = (BtKey*)lowerfence;
1598
1599         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1600           return bt->err;
1601
1602         //      obtain delete and write locks to right node
1603
1604         bt_unlockpage (BtLockParent, right->latch);
1605         bt_lockpage (BtLockDelete, right->latch);
1606         bt_lockpage (BtLockWrite, right->latch);
1607         bt_freepage (bt, right);
1608
1609         bt_unlockpage (BtLockParent, set->latch);
1610         bt_unpinlatch (set->latch);
1611         bt->found = found;
1612         return 0;
1613 }
1614
1615 BtKey *bt_foundkey (BtDb *bt)
1616 {
1617         return (BtKey*)(bt->key);
1618 }
1619
1620 //      advance to next slot
1621
1622 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1623 {
1624 BtLatchSet *prevlatch;
1625 uid page_no;
1626
1627         if( slot < set->page->cnt )
1628                 return slot + 1;
1629
1630         prevlatch = set->latch;
1631
1632         if( page_no = bt_getid(set->page->right) )
1633           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1634                 set->page = bt_mappage (bt, set->latch);
1635           else
1636                 return 0;
1637         else
1638           return bt->err = BTERR_struct, 0;
1639
1640         // obtain access lock using lock chaining with Access mode
1641
1642         bt_lockpage(BtLockAccess, set->latch);
1643
1644         bt_unlockpage(BtLockRead, prevlatch);
1645         bt_unpinlatch (prevlatch);
1646
1647         bt_lockpage(BtLockRead, set->latch);
1648         bt_unlockpage(BtLockAccess, set->latch);
1649
1650         set->page_no = page_no;
1651         return 1;
1652 }
1653
1654 //      find unique key or first duplicate key in
1655 //      leaf level and return number of value bytes
1656 //      or (-1) if not found.  Setup key for bt_foundkey
1657
1658 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1659 {
1660 BtPageSet set[1];
1661 uint len, slot;
1662 int ret = -1;
1663 BtKey *ptr;
1664 BtVal *val;
1665
1666   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1667    do {
1668         ptr = keyptr(set->page, slot);
1669
1670         //      skip librarian slot place holder
1671
1672         if( slotptr(set->page, slot)->type == Librarian )
1673                 ptr = keyptr(set->page, ++slot);
1674
1675         //      return actual key found
1676
1677         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1678         len = ptr->len;
1679
1680         if( slotptr(set->page, slot)->type == Duplicate )
1681                 len -= BtId;
1682
1683         //      not there if we reach the stopper key
1684
1685         if( slot == set->page->cnt )
1686           if( !bt_getid (set->page->right) )
1687                 break;
1688
1689         // if key exists, return >= 0 value bytes copied
1690         //      otherwise return (-1)
1691
1692         if( slotptr(set->page, slot)->dead )
1693                 continue;
1694
1695         if( keylen == len )
1696           if( !memcmp (ptr->key, key, len) ) {
1697                 val = valptr (set->page,slot);
1698                 if( valmax > val->len )
1699                         valmax = val->len;
1700                 memcpy (value, val->value, valmax);
1701                 ret = valmax;
1702           }
1703
1704         break;
1705
1706    } while( slot = bt_findnext (bt, set, slot) );
1707
1708   bt_unlockpage (BtLockRead, set->latch);
1709   bt_unpinlatch (set->latch);
1710   return ret;
1711 }
1712
1713 //      check page for space available,
1714 //      clean if necessary and return
1715 //      0 - page needs splitting
1716 //      >0  new slot value
1717
1718 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1719 {
1720 uint nxt = bt->mgr->page_size;
1721 BtPage page = set->page;
1722 uint cnt = 0, idx = 0;
1723 uint max = page->cnt;
1724 uint newslot = max;
1725 BtKey *key;
1726 BtVal *val;
1727
1728         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1729                 return slot;
1730
1731         //      skip cleanup and proceed to split
1732         //      if there's not enough garbage
1733         //      to bother with.
1734
1735         if( page->garbage < nxt / 5 )
1736                 return 0;
1737
1738         memcpy (bt->frame, page, bt->mgr->page_size);
1739
1740         // skip page info and set rest of page to zero
1741
1742         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1743         set->latch->dirty = 1;
1744         page->garbage = 0;
1745         page->act = 0;
1746
1747         // clean up page first by
1748         // removing deleted keys
1749
1750         while( cnt++ < max ) {
1751                 if( cnt == slot )
1752                         newslot = idx + 2;
1753
1754                 if( cnt < max || page->lvl )
1755                  if( slotptr(bt->frame,cnt)->dead )
1756                         continue;
1757
1758                 // copy the value across
1759
1760                 val = valptr(bt->frame, cnt);
1761                 nxt -= val->len + sizeof(BtVal);
1762                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1763
1764                 // copy the key across
1765
1766                 key = keyptr(bt->frame, cnt);
1767                 nxt -= key->len + sizeof(BtKey);
1768                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1769
1770                 // make a librarian slot
1771
1772                 slotptr(page, ++idx)->off = nxt;
1773                 slotptr(page, idx)->type = Librarian;
1774                 slotptr(page, idx)->dead = 1;
1775
1776                 // set up the slot
1777
1778                 slotptr(page, ++idx)->off = nxt;
1779                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1780
1781                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1782                         page->act++;
1783         }
1784
1785         page->min = nxt;
1786         page->cnt = idx;
1787
1788         //      see if page has enough space now, or does it need splitting?
1789
1790         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1791                 return newslot;
1792
1793         return 0;
1794 }
1795
1796 // split the root and raise the height of the btree
1797
1798 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1799 {
1800 uint nxt = bt->mgr->page_size;
1801 unsigned char value[BtId];
1802 BtPageSet left[1];
1803 BtKey *ptr;
1804 BtVal *val;
1805
1806         //  Obtain an empty page to use, and copy the current
1807         //  root contents into it, e.g. lower keys
1808
1809         if( bt_newpage(bt, left, root->page) )
1810                 return bt->err;
1811
1812         bt_unpinlatch (left->latch);
1813
1814         // set left from higher to lower page
1815
1816         bt_putid (right->page->left, left->page_no);
1817         bt_unpinlatch (right->latch);
1818
1819         // preserve the page info at the bottom
1820         // of higher keys and set rest to zero
1821
1822         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1823
1824         // insert stopper key at top of newroot page
1825         // and increase the root height
1826
1827         nxt -= BtId + sizeof(BtVal);
1828         bt_putid (value, right->page_no);
1829         val = (BtVal *)((unsigned char *)root->page + nxt);
1830         memcpy (val->value, value, BtId);
1831         val->len = BtId;
1832
1833         nxt -= 2 + sizeof(BtKey);
1834         slotptr(root->page, 2)->off = nxt;
1835         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1836         ptr->len = 2;
1837         ptr->key[0] = 0xff;
1838         ptr->key[1] = 0xff;
1839
1840         // insert lower keys page fence key on newroot page as first key
1841
1842         nxt -= BtId + sizeof(BtVal);
1843         bt_putid (value, left->page_no);
1844         val = (BtVal *)((unsigned char *)root->page + nxt);
1845         memcpy (val->value, value, BtId);
1846         val->len = BtId;
1847
1848         nxt -= leftkey->len + sizeof(BtKey);
1849         slotptr(root->page, 1)->off = nxt;
1850         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1851         
1852         bt_putid(root->page->right, 0);
1853         root->page->min = nxt;          // reset lowest used offset and key count
1854         root->page->cnt = 2;
1855         root->page->act = 2;
1856         root->page->lvl++;
1857
1858         // release and unpin root pages
1859
1860         bt_unlockpage(BtLockWrite, root->latch);
1861         bt_unpinlatch (root->latch);
1862         return 0;
1863 }
1864
1865 //  split already locked full node
1866 //      return unlocked.
1867
1868 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1869 {
1870 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1871 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1872 unsigned char value[BtId];
1873 uint lvl = set->page->lvl;
1874 BtPageSet right[1];
1875 BtKey *key, *ptr;
1876 BtVal *val, *src;
1877 uid right2;
1878 uint prev;
1879
1880         //  split higher half of keys to bt->frame
1881
1882         memset (bt->frame, 0, bt->mgr->page_size);
1883         max = set->page->cnt;
1884         cnt = max / 2;
1885         idx = 0;
1886
1887         while( cnt++ < max ) {
1888                 if( cnt < max || set->page->lvl )
1889                   if( slotptr(set->page, cnt)->dead )
1890                         continue;
1891
1892                 src = valptr(set->page, cnt);
1893                 nxt -= src->len + sizeof(BtVal);
1894                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1895
1896                 key = keyptr(set->page, cnt);
1897                 nxt -= key->len + sizeof(BtKey);
1898                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1899                 memcpy (ptr, key, key->len + sizeof(BtKey));
1900
1901                 //      add librarian slot
1902
1903                 slotptr(bt->frame, ++idx)->off = nxt;
1904                 slotptr(bt->frame, idx)->type = Librarian;
1905                 slotptr(bt->frame, idx)->dead = 1;
1906
1907                 //  add actual slot
1908
1909                 slotptr(bt->frame, ++idx)->off = nxt;
1910                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1911
1912                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1913                         bt->frame->act++;
1914         }
1915
1916         // remember existing fence key for new page to the right
1917
1918         memcpy (rightkey, key, key->len + sizeof(BtKey));
1919
1920         bt->frame->bits = bt->mgr->page_bits;
1921         bt->frame->min = nxt;
1922         bt->frame->cnt = idx;
1923         bt->frame->lvl = lvl;
1924
1925         // link right node
1926
1927         if( set->page_no > ROOT_page ) {
1928                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1929                 bt_putid(bt->frame->left, set->page_no);
1930         }
1931
1932         //      get new free page and write higher keys to it.
1933
1934         if( bt_newpage(bt, right, bt->frame) )
1935                 return bt->err;
1936
1937         // link left node
1938
1939         if( set->page_no > ROOT_page && !lvl )
1940           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1941                 return bt->err;
1942
1943         //      update lower keys to continue in old page
1944
1945         memcpy (bt->frame, set->page, bt->mgr->page_size);
1946         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1947         set->latch->dirty = 1;
1948
1949         nxt = bt->mgr->page_size;
1950         set->page->garbage = 0;
1951         set->page->act = 0;
1952         max /= 2;
1953         cnt = 0;
1954         idx = 0;
1955
1956         if( slotptr(bt->frame, max)->type == Librarian )
1957                 max--;
1958
1959         //  assemble page of smaller keys
1960
1961         while( cnt++ < max ) {
1962                 if( slotptr(bt->frame, cnt)->dead )
1963                         continue;
1964                 val = valptr(bt->frame, cnt);
1965                 nxt -= val->len + sizeof(BtVal);
1966                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1967
1968                 key = keyptr(bt->frame, cnt);
1969                 nxt -= key->len + sizeof(BtKey);
1970                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1971
1972                 //      add librarian slot
1973
1974                 slotptr(set->page, ++idx)->off = nxt;
1975                 slotptr(set->page, idx)->type = Librarian;
1976                 slotptr(set->page, idx)->dead = 1;
1977
1978                 //      add actual slot
1979
1980                 slotptr(set->page, ++idx)->off = nxt;
1981                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1982                 set->page->act++;
1983         }
1984
1985         // remember fence key for smaller page
1986
1987         memcpy(fencekey, key, key->len + sizeof(BtKey));
1988
1989         bt_putid(set->page->right, right->page_no);
1990         set->page->min = nxt;
1991         set->page->cnt = idx;
1992
1993         // if current page is the root page, split it
1994
1995         if( set->page_no == ROOT_page )
1996                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
1997
1998         // insert new fences in their parent pages
1999
2000         bt_lockpage (BtLockParent, right->latch);
2001
2002         bt_lockpage (BtLockParent, set->latch);
2003         bt_unlockpage (BtLockWrite, set->latch);
2004
2005         // insert new fence for reformulated left block of smaller keys
2006
2007         ptr = (BtKey*)fencekey;
2008         bt_putid (value, set->page_no);
2009
2010         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2011                 return bt->err;
2012
2013         // switch fence for right block of larger keys to new right page
2014
2015         ptr = (BtKey*)rightkey;
2016         bt_putid (value, right->page_no);
2017
2018         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2019                 return bt->err;
2020
2021         bt_unlockpage (BtLockParent, set->latch);
2022         bt_unpinlatch (set->latch);
2023
2024         bt_unlockpage (BtLockParent, right->latch);
2025         bt_unpinlatch (right->latch);
2026         return 0;
2027 }
2028
2029 //      install new key and value onto page
2030 //      page must already be checked for
2031 //      adequate space
2032
2033 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2034 {
2035 uint idx, librarian;
2036 BtSlot *node;
2037 BtKey *ptr;
2038 BtVal *val;
2039
2040         //      if found slot > desired slot and previous slot
2041         //      is a librarian slot, use it
2042
2043         if( slot > 1 )
2044           if( slotptr(set->page, slot-1)->type == Librarian )
2045                 slot--;
2046
2047         // copy value onto page
2048
2049         set->page->min -= vallen + sizeof(BtVal);
2050         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2051         memcpy (val->value, value, vallen);
2052         val->len = vallen;
2053
2054         // copy key onto page
2055
2056         set->page->min -= keylen + sizeof(BtKey);
2057         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2058         memcpy (ptr->key, key, keylen);
2059         ptr->len = keylen;
2060         
2061         //      find first empty slot
2062
2063         for( idx = slot; idx < set->page->cnt; idx++ )
2064           if( slotptr(set->page, idx)->dead )
2065                 break;
2066
2067         // now insert key into array before slot
2068
2069         if( idx == set->page->cnt )
2070                 idx += 2, set->page->cnt += 2, librarian = 2;
2071         else
2072                 librarian = 1;
2073
2074         set->latch->dirty = 1;
2075         set->page->act++;
2076
2077         while( idx > slot + librarian - 1 )
2078                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2079
2080         //      add librarian slot
2081
2082         if( librarian > 1 ) {
2083                 node = slotptr(set->page, slot++);
2084                 node->off = set->page->min;
2085                 node->type = Librarian;
2086                 node->dead = 1;
2087         }
2088
2089         //      fill in new slot
2090
2091         node = slotptr(set->page, slot);
2092         node->off = set->page->min;
2093         node->type = type;
2094         node->dead = 0;
2095
2096         bt_unlockpage (BtLockWrite, set->latch);
2097         bt_unpinlatch (set->latch);
2098         return 0;
2099 }
2100
2101 //  Insert new key into the btree at given level.
2102 //      either add a new key or update/add an existing one
2103
2104 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2105 {
2106 unsigned char newkey[BT_keyarray];
2107 uint slot, idx, len;
2108 BtPageSet set[1];
2109 BtKey *ptr, *ins;
2110 uid sequence;
2111 BtVal *val;
2112 uint type;
2113
2114   // set up the key we're working on
2115
2116   ins = (BtKey*)newkey;
2117   memcpy (ins->key, key, keylen);
2118   ins->len = keylen;
2119
2120   // is this a non-unique index value?
2121
2122   if( unique )
2123         type = Unique;
2124   else {
2125         type = Duplicate;
2126         sequence = bt_newdup (bt);
2127         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2128         ins->len += BtId;
2129   }
2130
2131   while( 1 ) { // find the page and slot for the current key
2132         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2133                 ptr = keyptr(set->page, slot);
2134         else {
2135                 if( !bt->err )
2136                         bt->err = BTERR_ovflw;
2137                 return bt->err;
2138         }
2139
2140         // if librarian slot == found slot, advance to real slot
2141
2142         if( slotptr(set->page, slot)->type == Librarian )
2143           if( !keycmp (ptr, key, keylen) )
2144                 ptr = keyptr(set->page, ++slot);
2145
2146         len = ptr->len;
2147
2148         if( slotptr(set->page, slot)->type == Duplicate )
2149                 len -= BtId;
2150
2151         //  if inserting a duplicate key or unique key
2152         //      check for adequate space on the page
2153         //      and insert the new key before slot.
2154
2155         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2156           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2157                 if( bt_splitpage (bt, set) )
2158                   return bt->err;
2159                 else
2160                   continue;
2161
2162           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2163         }
2164
2165         // if key already exists, update value and return
2166
2167         val = valptr(set->page, slot);
2168
2169         if( val->len >= vallen ) {
2170                 if( slotptr(set->page, slot)->dead )
2171                         set->page->act++;
2172                 set->page->garbage += val->len - vallen;
2173                 set->latch->dirty = 1;
2174                 slotptr(set->page, slot)->dead = 0;
2175                 val->len = vallen;
2176                 memcpy (val->value, value, vallen);
2177                 bt_unlockpage(BtLockWrite, set->latch);
2178                 bt_unpinlatch (set->latch);
2179                 return 0;
2180         }
2181
2182         //      new update value doesn't fit in existing value area
2183
2184         if( !slotptr(set->page, slot)->dead )
2185                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2186         else {
2187                 slotptr(set->page, slot)->dead = 0;
2188                 set->page->act++;
2189         }
2190
2191         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2192           if( bt_splitpage (bt, set) )
2193                 return bt->err;
2194           else
2195                 continue;
2196
2197         set->page->min -= vallen + sizeof(BtVal);
2198         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2199         memcpy (val->value, value, vallen);
2200         val->len = vallen;
2201
2202         set->latch->dirty = 1;
2203         set->page->min -= keylen + sizeof(BtKey);
2204         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2205         memcpy (ptr->key, key, keylen);
2206         ptr->len = keylen;
2207         
2208         slotptr(set->page, slot)->off = set->page->min;
2209         bt_unlockpage(BtLockWrite, set->latch);
2210         bt_unpinlatch (set->latch);
2211         return 0;
2212   }
2213   return 0;
2214 }
2215
2216 //      set cursor to highest slot on highest page
2217
2218 uint bt_lastkey (BtDb *bt)
2219 {
2220 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2221 BtPageSet set[1];
2222 uint slot;
2223
2224         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2225                 set->page = bt_mappage (bt, set->latch);
2226         else
2227                 return 0;
2228
2229     bt_lockpage(BtLockRead, set->latch);
2230
2231         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2232         slot = set->page->cnt;
2233
2234     bt_unlockpage(BtLockRead, set->latch);
2235         bt_unpinlatch (set->latch);
2236         return slot;
2237 }
2238
2239 //      return previous slot on cursor page
2240
2241 uint bt_prevkey (BtDb *bt, uint slot)
2242 {
2243 BtPageSet set[1];
2244 uid left;
2245
2246         if( --slot )
2247                 return slot;
2248
2249         if( left = bt_getid(bt->cursor->left) )
2250                 bt->cursor_page = left;
2251         else
2252                 return 0;
2253
2254         if( set->latch = bt_pinlatch (bt, left, 1) )
2255                 set->page = bt_mappage (bt, set->latch);
2256         else
2257                 return 0;
2258
2259     bt_lockpage(BtLockRead, set->latch);
2260         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2261         bt_unlockpage(BtLockRead, set->latch);
2262         bt_unpinlatch (set->latch);
2263         return bt->cursor->cnt;
2264 }
2265
2266 //  return next slot on cursor page
2267 //  or slide cursor right into next page
2268
2269 uint bt_nextkey (BtDb *bt, uint slot)
2270 {
2271 BtPageSet set[1];
2272 uid right;
2273
2274   do {
2275         right = bt_getid(bt->cursor->right);
2276
2277         while( slot++ < bt->cursor->cnt )
2278           if( slotptr(bt->cursor,slot)->dead )
2279                 continue;
2280           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2281                 return slot;
2282           else
2283                 break;
2284
2285         if( !right )
2286                 break;
2287
2288         bt->cursor_page = right;
2289
2290         if( set->latch = bt_pinlatch (bt, right, 1) )
2291                 set->page = bt_mappage (bt, set->latch);
2292         else
2293                 return 0;
2294
2295     bt_lockpage(BtLockRead, set->latch);
2296
2297         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2298
2299         bt_unlockpage(BtLockRead, set->latch);
2300         bt_unpinlatch (set->latch);
2301         slot = 0;
2302
2303   } while( 1 );
2304
2305   return bt->err = 0;
2306 }
2307
2308 //  cache page of keys into cursor and return starting slot for given key
2309
2310 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2311 {
2312 BtPageSet set[1];
2313 uint slot;
2314
2315         // cache page for retrieval
2316
2317         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2318           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2319         else
2320           return 0;
2321
2322         bt->cursor_page = set->page_no;
2323
2324         bt_unlockpage(BtLockRead, set->latch);
2325         bt_unpinlatch (set->latch);
2326         return slot;
2327 }
2328
2329 BtKey *bt_key(BtDb *bt, uint slot)
2330 {
2331         return keyptr(bt->cursor, slot);
2332 }
2333
2334 BtVal *bt_val(BtDb *bt, uint slot)
2335 {
2336         return valptr(bt->cursor,slot);
2337 }
2338
2339 #ifdef STANDALONE
2340
2341 #ifndef unix
2342 double getCpuTime(int type)
2343 {
2344 FILETIME crtime[1];
2345 FILETIME xittime[1];
2346 FILETIME systime[1];
2347 FILETIME usrtime[1];
2348 SYSTEMTIME timeconv[1];
2349 double ans = 0;
2350
2351         memset (timeconv, 0, sizeof(SYSTEMTIME));
2352
2353         switch( type ) {
2354         case 0:
2355                 GetSystemTimeAsFileTime (xittime);
2356                 FileTimeToSystemTime (xittime, timeconv);
2357                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2358                 break;
2359         case 1:
2360                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2361                 FileTimeToSystemTime (usrtime, timeconv);
2362                 break;
2363         case 2:
2364                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2365                 FileTimeToSystemTime (systime, timeconv);
2366                 break;
2367         }
2368
2369         ans += (double)timeconv->wHour * 3600;
2370         ans += (double)timeconv->wMinute * 60;
2371         ans += (double)timeconv->wSecond;
2372         ans += (double)timeconv->wMilliseconds / 1000;
2373         return ans;
2374 }
2375 #else
2376 #include <time.h>
2377 #include <sys/resource.h>
2378
2379 double getCpuTime(int type)
2380 {
2381 struct rusage used[1];
2382 struct timeval tv[1];
2383
2384         switch( type ) {
2385         case 0:
2386                 gettimeofday(tv, NULL);
2387                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2388
2389         case 1:
2390                 getrusage(RUSAGE_SELF, used);
2391                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2392
2393         case 2:
2394                 getrusage(RUSAGE_SELF, used);
2395                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2396         }
2397
2398         return 0;
2399 }
2400 #endif
2401
2402 void bt_poolaudit (BtMgr *mgr)
2403 {
2404 BtLatchSet *latch;
2405 uint slot = 0;
2406
2407         while( slot++ < mgr->latchdeployed ) {
2408                 latch = mgr->latchsets + slot;
2409
2410                 if( *latch->readwr->rin & MASK )
2411                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2412                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2413
2414                 if( *latch->access->rin & MASK )
2415                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2416                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2417
2418                 if( *latch->parent->rin & MASK )
2419                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2420                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2421
2422                 if( latch->pin & ~CLOCK_bit ) {
2423                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2424                         latch->pin = 0;
2425                 }
2426         }
2427 }
2428
2429 uint bt_latchaudit (BtDb *bt)
2430 {
2431 ushort idx, hashidx;
2432 uid next, page_no;
2433 BtLatchSet *latch;
2434 uint cnt = 0;
2435 BtKey *ptr;
2436
2437         if( *(ushort *)(bt->mgr->lock) )
2438                 fprintf(stderr, "Alloc page locked\n");
2439         *(ushort *)(bt->mgr->lock) = 0;
2440
2441         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2442                 latch = bt->mgr->latchsets + idx;
2443                 if( *latch->readwr->rin & MASK )
2444                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2445                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2446
2447                 if( *latch->access->rin & MASK )
2448                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2449                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2450
2451                 if( *latch->parent->rin & MASK )
2452                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2453                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2454
2455                 if( latch->pin ) {
2456                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2457                         latch->pin = 0;
2458                 }
2459         }
2460
2461         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2462           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2463                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2464
2465           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2466
2467           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2468                 latch = bt->mgr->latchsets + idx;
2469                 if( latch->pin )
2470                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2471           } while( idx = latch->next );
2472         }
2473
2474         page_no = LEAF_page;
2475
2476         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2477         uid off = page_no << bt->mgr->page_bits;
2478 #ifdef unix
2479           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2480 #else
2481         DWORD amt[1];
2482
2483           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2484
2485           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2486                 return bt->err = BTERR_map;
2487
2488           if( *amt <  bt->mgr->page_size )
2489                 return bt->err = BTERR_map;
2490 #endif
2491                 if( !bt->frame->free && !bt->frame->lvl )
2492                         cnt += bt->frame->act;
2493                 page_no++;
2494         }
2495                 
2496         cnt--;  // remove stopper key
2497         fprintf(stderr, " Total keys read %d\n", cnt);
2498
2499         bt_close (bt);
2500         return 0;
2501 }
2502
2503 typedef struct {
2504         char idx;
2505         char *type;
2506         char *infile;
2507         BtMgr *mgr;
2508         int num;
2509 } ThreadArg;
2510
2511 //  standalone program to index file of keys
2512 //  then list them onto std-out
2513
2514 #ifdef unix
2515 void *index_file (void *arg)
2516 #else
2517 uint __stdcall index_file (void *arg)
2518 #endif
2519 {
2520 int line = 0, found = 0, cnt = 0, unique;
2521 uid next, page_no = LEAF_page;  // start on first page of leaves
2522 unsigned char key[BT_maxkey];
2523 ThreadArg *args = arg;
2524 int ch, len = 0, slot;
2525 BtPageSet set[1];
2526 BtKey *ptr;
2527 BtVal *val;
2528 BtDb *bt;
2529 FILE *in;
2530
2531         bt = bt_open (args->mgr);
2532
2533         unique = (args->type[1] | 0x20) != 'd';
2534
2535         switch(args->type[0] | 0x20)
2536         {
2537         case 'a':
2538                 fprintf(stderr, "started latch mgr audit\n");
2539                 cnt = bt_latchaudit (bt);
2540                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2541                 break;
2542
2543         case 'p':
2544                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2545                 if( in = fopen (args->infile, "rb") )
2546                   while( ch = getc(in), ch != EOF )
2547                         if( ch == '\n' )
2548                         {
2549                           line++;
2550
2551                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2552                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2553                           len = 0;
2554                         }
2555                         else if( len < BT_maxkey )
2556                                 key[len++] = ch;
2557                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2558                 break;
2559
2560         case 'w':
2561                 fprintf(stderr, "started indexing for %s\n", args->infile);
2562                 if( in = fopen (args->infile, "rb") )
2563                   while( ch = getc(in), ch != EOF )
2564                         if( ch == '\n' )
2565                         {
2566                           line++;
2567
2568                           if( args->num == 1 )
2569                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2570
2571                           else if( args->num )
2572                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2573
2574                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2575                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2576                           len = 0;
2577                         }
2578                         else if( len < BT_maxkey )
2579                                 key[len++] = ch;
2580                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2581                 break;
2582
2583         case 'd':
2584                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2585                 if( in = fopen (args->infile, "rb") )
2586                   while( ch = getc(in), ch != EOF )
2587                         if( ch == '\n' )
2588                         {
2589                           line++;
2590                           if( args->num == 1 )
2591                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2592
2593                           else if( args->num )
2594                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2595
2596                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2597                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2598                           ptr = (BtKey*)(bt->key);
2599                           found++;
2600
2601                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2602                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2603                           len = 0;
2604                         }
2605                         else if( len < BT_maxkey )
2606                                 key[len++] = ch;
2607                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2608                 break;
2609
2610         case 'f':
2611                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2612                 if( in = fopen (args->infile, "rb") )
2613                   while( ch = getc(in), ch != EOF )
2614                         if( ch == '\n' )
2615                         {
2616                           line++;
2617                           if( args->num == 1 )
2618                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2619
2620                           else if( args->num )
2621                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2622
2623                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2624                                 found++;
2625                           else if( bt->err )
2626                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2627                           len = 0;
2628                         }
2629                         else if( len < BT_maxkey )
2630                                 key[len++] = ch;
2631                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2632                 break;
2633
2634         case 's':
2635                 fprintf(stderr, "started scanning\n");
2636                 do {
2637                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2638                                 set->page = bt_mappage (bt, set->latch);
2639                         else
2640                                 fprintf(stderr, "unable to obtain latch"), exit(1);
2641                         bt_lockpage (BtLockRead, set->latch);
2642                         next = bt_getid (set->page->right);
2643
2644                         for( slot = 0; slot++ < set->page->cnt; )
2645                          if( next || slot < set->page->cnt )
2646                           if( !slotptr(set->page, slot)->dead ) {
2647                                 ptr = keyptr(set->page, slot);
2648                                 len = ptr->len;
2649
2650                             if( slotptr(set->page, slot)->type == Duplicate )
2651                                         len -= BtId;
2652
2653                                 fwrite (ptr->key, len, 1, stdout);
2654                                 val = valptr(set->page, slot);
2655                                 fwrite (val->value, val->len, 1, stdout);
2656                                 fputc ('\n', stdout);
2657                                 cnt++;
2658                            }
2659
2660                         bt_unlockpage (BtLockRead, set->latch);
2661                         bt_unpinlatch (set->latch);
2662                 } while( page_no = next );
2663
2664                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2665                 break;
2666
2667         case 'r':
2668                 fprintf(stderr, "started reverse scan\n");
2669                 if( slot = bt_lastkey (bt) )
2670                    while( slot = bt_prevkey (bt, slot) ) {
2671                         if( slotptr(bt->cursor, slot)->dead )
2672                           continue;
2673
2674                         ptr = keyptr(bt->cursor, slot);
2675                         len = ptr->len;
2676
2677                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2678                                 len -= BtId;
2679
2680                         fwrite (ptr->key, len, 1, stdout);
2681                         val = valptr(bt->cursor, slot);
2682                         fwrite (val->value, val->len, 1, stdout);
2683                         fputc ('\n', stdout);
2684                         cnt++;
2685                   }
2686
2687                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2688                 break;
2689
2690         case 'c':
2691 #ifdef unix
2692                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2693 #endif
2694                 fprintf(stderr, "started counting\n");
2695                 page_no = LEAF_page;
2696
2697                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2698                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
2699                                 break;
2700
2701                         if( !bt->frame->free && !bt->frame->lvl )
2702                                 cnt += bt->frame->act;
2703
2704                         bt->reads++;
2705                         page_no++;
2706                 }
2707                 
2708                 cnt--;  // remove stopper key
2709                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2710                 break;
2711         }
2712
2713         bt_close (bt);
2714 #ifdef unix
2715         return NULL;
2716 #else
2717         return 0;
2718 #endif
2719 }
2720
2721 typedef struct timeval timer;
2722
2723 int main (int argc, char **argv)
2724 {
2725 int idx, cnt, len, slot, err;
2726 int segsize, bits = 16;
2727 double start, stop;
2728 #ifdef unix
2729 pthread_t *threads;
2730 #else
2731 HANDLE *threads;
2732 #endif
2733 ThreadArg *args;
2734 uint poolsize = 0;
2735 float elapsed;
2736 int num = 0;
2737 char key[1];
2738 BtMgr *mgr;
2739 BtKey *ptr;
2740 BtDb *bt;
2741
2742         if( argc < 3 ) {
2743                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2744                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2745                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
2746                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2747                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2748                 exit(0);
2749         }
2750
2751         start = getCpuTime(0);
2752
2753         if( argc > 3 )
2754                 bits = atoi(argv[3]);
2755
2756         if( argc > 4 )
2757                 poolsize = atoi(argv[4]);
2758
2759         if( !poolsize )
2760                 fprintf (stderr, "Warning: no mapped_pool\n");
2761
2762         if( argc > 5 )
2763                 num = atoi(argv[5]);
2764
2765         cnt = argc - 6;
2766 #ifdef unix
2767         threads = malloc (cnt * sizeof(pthread_t));
2768 #else
2769         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2770 #endif
2771         args = malloc (cnt * sizeof(ThreadArg));
2772
2773         mgr = bt_mgr ((argv[1]), bits, poolsize);
2774
2775         if( !mgr ) {
2776                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2777                 exit (1);
2778         }
2779
2780         //      fire off threads
2781
2782         for( idx = 0; idx < cnt; idx++ ) {
2783                 args[idx].infile = argv[idx + 6];
2784                 args[idx].type = argv[2];
2785                 args[idx].mgr = mgr;
2786                 args[idx].num = num;
2787                 args[idx].idx = idx;
2788 #ifdef unix
2789                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2790                         fprintf(stderr, "Error creating thread %d\n", err);
2791 #else
2792                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2793 #endif
2794         }
2795
2796         //      wait for termination
2797
2798 #ifdef unix
2799         for( idx = 0; idx < cnt; idx++ )
2800                 pthread_join (threads[idx], NULL);
2801 #else
2802         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2803
2804         for( idx = 0; idx < cnt; idx++ )
2805                 CloseHandle(threads[idx]);
2806
2807 #endif
2808         bt_poolaudit(mgr);
2809         bt_mgrclose (mgr);
2810
2811         elapsed = getCpuTime(0) - start;
2812         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2813         elapsed = getCpuTime(1);
2814         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2815         elapsed = getCpuTime(2);
2816         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2817 }
2818
2819 #endif  //STANDALONE