]> pd.if.org Git - btree/blob - threadskv6.c
fix latch protocol in bt_atomicload
[btree] / threadskv6.c
1 // btree version threadskv6 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      and traditional buffer pool manager
8
9 // 10 SEP 2014
10
11 // author: karl malbrain, malbrain@cal.berkeley.edu
12
13 /*
14 This work, including the source code, documentation
15 and related data, is placed into the public domain.
16
17 The orginal author is Karl Malbrain.
18
19 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
20 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
21 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
22 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
23 RESULTING FROM THE USE, MODIFICATION, OR
24 REDISTRIBUTION OF THIS SOFTWARE.
25 */
26
27 // Please see the project home page for documentation
28 // code.google.com/p/high-concurrency-btree
29
30 #define _FILE_OFFSET_BITS 64
31 #define _LARGEFILE64_SOURCE
32
33 #ifdef linux
34 #define _GNU_SOURCE
35 #endif
36
37 #ifdef unix
38 #include <unistd.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <fcntl.h>
42 #include <sys/time.h>
43 #include <sys/mman.h>
44 #include <errno.h>
45 #include <pthread.h>
46 #else
47 #define WIN32_LEAN_AND_MEAN
48 #include <windows.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51 #include <time.h>
52 #include <fcntl.h>
53 #include <process.h>
54 #include <intrin.h>
55 #endif
56
57 #include <memory.h>
58 #include <string.h>
59 #include <stddef.h>
60
61 typedef unsigned long long      uid;
62
63 #ifndef unix
64 typedef unsigned long long      off64_t;
65 typedef unsigned short          ushort;
66 typedef unsigned int            uint;
67 #endif
68
69 #define BT_ro 0x6f72    // ro
70 #define BT_rw 0x7772    // rw
71
72 #define BT_maxbits              24                                      // maximum page size in bits
73 #define BT_minbits              9                                       // minimum page size in bits
74 #define BT_minpage              (1 << BT_minbits)       // minimum page size
75 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
76
77 //  BTree page number constants
78 #define ALLOC_page              0       // allocation page
79 #define ROOT_page               1       // root of the btree
80 #define LEAF_page               2       // first page of leaves
81
82 //      Number of levels to create in a new BTree
83
84 #define MIN_lvl                 2
85
86 /*
87 There are five lock types for each node in three independent sets: 
88 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
89 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
90 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
91 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
92 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
93 */
94
95 typedef enum{
96         BtLockAccess,
97         BtLockDelete,
98         BtLockRead,
99         BtLockWrite,
100         BtLockParent
101 } BtLock;
102
103 //      definition for phase-fair reader/writer lock implementation
104
105 typedef struct {
106         ushort rin[1];
107         ushort rout[1];
108         ushort ticket[1];
109         ushort serving[1];
110 } RWLock;
111
112 #define PHID 0x1
113 #define PRES 0x2
114 #define MASK 0x3
115 #define RINC 0x4
116
117 //      definition for spin latch implementation
118
119 // exclusive is set for write access
120 // share is count of read accessors
121 // grant write lock when share == 0
122
123 volatile typedef struct {
124         ushort exclusive:1;
125         ushort pending:1;
126         ushort share:14;
127 } BtSpinLatch;
128
129 #define XCL 1
130 #define PEND 2
131 #define BOTH 3
132 #define SHARE 4
133
134 //  hash table entries
135
136 typedef struct {
137         volatile uint slot;             // Latch table entry at head of chain
138         BtSpinLatch latch[1];
139 } BtHashEntry;
140
141 //      latch manager table structure
142
143 typedef struct {
144         volatile uid page_no;   // latch set page number
145         RWLock readwr[1];               // read/write page lock
146         RWLock access[1];               // Access Intent/Page delete
147         RWLock parent[1];               // Posting of fence key in parent
148         uint slot;                              // entry slot in latch table
149         uint next;                              // next entry in hash table chain
150         uint prev;                              // prev entry in hash table chain
151         volatile ushort pin;    // number of outstanding threads
152         ushort dirty:1;                 // page in cache is dirty
153 } BtLatchSet;
154
155 //      Define the length of the page record numbers
156
157 #define BtId 6
158
159 //      Page key slot definition.
160
161 //      Keys are marked dead, but remain on the page until
162 //      it cleanup is called. The fence key (highest key) for
163 //      a leaf page is always present, even after cleanup.
164
165 //      Slot types
166
167 //      In addition to the Unique keys that occupy slots
168 //      there are Librarian and Duplicate key
169 //      slots occupying the key slot array.
170
171 //      The Librarian slots are dead keys that
172 //      serve as filler, available to add new Unique
173 //      or Dup slots that are inserted into the B-tree.
174
175 //      The Duplicate slots have had their key bytes extended
176 //      by 6 bytes to contain a binary duplicate key uniqueifier.
177
178 typedef enum {
179         Unique,
180         Librarian,
181         Duplicate
182 } BtSlotType;
183
184 typedef struct {
185         uint off:BT_maxbits;    // page offset for key start
186         uint type:3;                    // type of slot
187         uint dead:1;                    // set for deleted slot
188 } BtSlot;
189
190 //      The key structure occupies space at the upper end of
191 //      each page.  It's a length byte followed by the key
192 //      bytes.
193
194 typedef struct {
195         unsigned char len;              // this can be changed to a ushort or uint
196         unsigned char key[0];
197 } BtKey;
198
199 //      the value structure also occupies space at the upper
200 //      end of the page. Each key is immediately followed by a value.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char value[0];
205 } BtVal;
206
207 #define BT_maxkey       255             // maximum number of bytes in a key
208 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
209
210 //      The first part of an index page.
211 //      It is immediately followed
212 //      by the BtSlot array of keys.
213
214 //      note that this structure size
215 //      must be a multiple of 8 bytes
216 //      in order to place dups correctly.
217
218 typedef struct BtPage_ {
219         uint cnt;                                       // count of keys in page
220         uint act;                                       // count of active keys
221         uint min;                                       // next key offset
222         uint garbage;                           // page garbage in bytes
223         unsigned char bits:7;           // page size in bits
224         unsigned char free:1;           // page is on free chain
225         unsigned char lvl:7;            // level of page
226         unsigned char kill:1;           // page is being deleted
227         unsigned char left[BtId];       // page number to left
228         unsigned char filler[2];        // padding to multiple of 8
229         unsigned char right[BtId];      // page number to right
230 } *BtPage;
231
232 //  The loadpage interface object
233
234 typedef struct {
235         uid page_no;            // current page number
236         BtPage page;            // current page pointer
237         BtLatchSet *latch;      // current page latch set
238 } BtPageSet;
239
240 //      structure for latch manager on ALLOC_page
241
242 typedef struct {
243         struct BtPage_ alloc[1];        // next page_no in right ptr
244         unsigned long long dups[1];     // global duplicate key uniqueifier
245         unsigned char chain[BtId];      // head of free page_nos chain
246 } BtPageZero;
247
248 //      The object structure for Btree access
249
250 typedef struct {
251         uint page_size;                         // page size    
252         uint page_bits;                         // page size in bits    
253 #ifdef unix
254         int idx;
255 #else
256         HANDLE idx;
257 #endif
258         BtPageZero *pagezero;           // mapped allocation page
259         BtSpinLatch lock[1];            // allocation area lite latch
260         uint latchdeployed;                     // highest number of latch entries deployed
261         uint nlatchpage;                        // number of latch pages at BT_latch
262         uint latchtotal;                        // number of page latch entries
263         uint latchhash;                         // number of latch hash table slots
264         uint latchvictim;                       // next latch entry to examine
265         BtHashEntry *hashtable;         // the anonymous mapping buffer pool
266         BtLatchSet *latchsets;          // mapped latch set from latch pages
267         unsigned char *pagepool;        // mapped to the buffer pool pages
268 #ifndef unix
269         HANDLE halloc;                          // allocation handle
270         HANDLE hpool;                           // buffer pool handle
271 #endif
272 } BtMgr;
273
274 typedef struct {
275         BtMgr *mgr;                             // buffer manager for thread
276         BtPage cursor;                  // cached frame for start/next (never mapped)
277         BtPage frame;                   // spare frame for the page split (never mapped)
278         uid cursor_page;                                // current cursor page number   
279         unsigned char *mem;                             // frame, cursor, page memory buffer
280         unsigned char key[BT_keyarray]; // last found complete key
281         int found;                              // last delete or insert was found
282         int err;                                // last error
283         int reads, writes;              // number of reads and writes from the btree
284 } BtDb;
285
286 typedef enum {
287         BTERR_ok = 0,
288         BTERR_struct,
289         BTERR_ovflw,
290         BTERR_lock,
291         BTERR_map,
292         BTERR_read,
293         BTERR_wrt,
294         BTERR_hash
295 } BTERR;
296
297 #define CLOCK_bit 0x8000
298
299 // B-Tree functions
300 extern void bt_close (BtDb *bt);
301 extern BtDb *bt_open (BtMgr *mgr);
302 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
303 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
304 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
305 extern BtKey *bt_foundkey (BtDb *bt);
306 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
307 extern uint bt_nextkey   (BtDb *bt, uint slot);
308
309 //      manager functions
310 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
311 void bt_mgrclose (BtMgr *mgr);
312
313 //  Helper functions to return slot values
314 //      from the cursor page.
315
316 extern BtKey *bt_key (BtDb *bt, uint slot);
317 extern BtVal *bt_val (BtDb *bt, uint slot);
318
319 //  The page is allocated from low and hi ends.
320 //  The key slots are allocated from the bottom,
321 //      while the text and value of the key
322 //  are allocated from the top.  When the two
323 //  areas meet, the page is split into two.
324
325 //  A key consists of a length byte, two bytes of
326 //  index number (0 - 65535), and up to 253 bytes
327 //  of key value.
328
329 //  Associated with each key is a value byte string
330 //      containing any value desired.
331
332 //  The b-tree root is always located at page 1.
333 //      The first leaf page of level zero is always
334 //      located on page 2.
335
336 //      The b-tree pages are linked with next
337 //      pointers to facilitate enumerators,
338 //      and provide for concurrency.
339
340 //      When to root page fills, it is split in two and
341 //      the tree height is raised by a new root at page
342 //      one with two keys.
343
344 //      Deleted keys are marked with a dead bit until
345 //      page cleanup. The fence key for a leaf node is
346 //      always present
347
348 //  To achieve maximum concurrency one page is locked at a time
349 //  as the tree is traversed to find leaf key in question. The right
350 //  page numbers are used in cases where the page is being split,
351 //      or consolidated.
352
353 //  Page 0 is dedicated to lock for new page extensions,
354 //      and chains empty pages together for reuse. It also
355 //      contains the latch manager hash table.
356
357 //      The ParentModification lock on a node is obtained to serialize posting
358 //      or changing the fence key for a node.
359
360 //      Empty pages are chained together through the ALLOC page and reused.
361
362 //      Access macros to address slot and key values from the page
363 //      Page slots use 1 based indexing.
364
365 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
366 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
367 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
368
369 void bt_putid(unsigned char *dest, uid id)
370 {
371 int i = BtId;
372
373         while( i-- )
374                 dest[i] = (unsigned char)id, id >>= 8;
375 }
376
377 uid bt_getid(unsigned char *src)
378 {
379 uid id = 0;
380 int i;
381
382         for( i = 0; i < BtId; i++ )
383                 id <<= 8, id |= *src++; 
384
385         return id;
386 }
387
388 uid bt_newdup (BtDb *bt)
389 {
390 #ifdef unix
391         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
392 #else
393         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
394 #endif
395 }
396
397 //      Phase-Fair reader/writer lock implementation
398
399 void WriteLock (RWLock *lock)
400 {
401 ushort w, r, tix;
402
403 #ifdef unix
404         tix = __sync_fetch_and_add (lock->ticket, 1);
405 #else
406         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
407 #endif
408         // wait for our ticket to come up
409
410         while( tix != lock->serving[0] )
411 #ifdef unix
412                 sched_yield();
413 #else
414                 SwitchToThread ();
415 #endif
416
417         w = PRES | (tix & PHID);
418 #ifdef  unix
419         r = __sync_fetch_and_add (lock->rin, w);
420 #else
421         r = _InterlockedExchangeAdd16 (lock->rin, w);
422 #endif
423         while( r != *lock->rout )
424 #ifdef unix
425                 sched_yield();
426 #else
427                 SwitchToThread();
428 #endif
429 }
430
431 void WriteRelease (RWLock *lock)
432 {
433 #ifdef unix
434         __sync_fetch_and_and (lock->rin, ~MASK);
435 #else
436         _InterlockedAnd16 (lock->rin, ~MASK);
437 #endif
438         lock->serving[0]++;
439 }
440
441 void ReadLock (RWLock *lock)
442 {
443 ushort w;
444 #ifdef unix
445         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
446 #else
447         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
448 #endif
449         if( w )
450           while( w == (*lock->rin & MASK) )
451 #ifdef unix
452                 sched_yield ();
453 #else
454                 SwitchToThread ();
455 #endif
456 }
457
458 void ReadRelease (RWLock *lock)
459 {
460 #ifdef unix
461         __sync_fetch_and_add (lock->rout, RINC);
462 #else
463         _InterlockedExchangeAdd16 (lock->rout, RINC);
464 #endif
465 }
466
467 //      Spin Latch Manager
468
469 //      wait until write lock mode is clear
470 //      and add 1 to the share count
471
472 void bt_spinreadlock(BtSpinLatch *latch)
473 {
474 ushort prev;
475
476   do {
477 #ifdef unix
478         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
479 #else
480         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
481 #endif
482         //  see if exclusive request is granted or pending
483
484         if( !(prev & BOTH) )
485                 return;
486 #ifdef unix
487         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
488 #else
489         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
490 #endif
491 #ifdef  unix
492   } while( sched_yield(), 1 );
493 #else
494   } while( SwitchToThread(), 1 );
495 #endif
496 }
497
498 //      wait for other read and write latches to relinquish
499
500 void bt_spinwritelock(BtSpinLatch *latch)
501 {
502 ushort prev;
503
504   do {
505 #ifdef  unix
506         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
507 #else
508         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
509 #endif
510         if( !(prev & XCL) )
511           if( !(prev & ~BOTH) )
512                 return;
513           else
514 #ifdef unix
515                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
516 #else
517                 _InterlockedAnd16((ushort *)latch, ~XCL);
518 #endif
519 #ifdef  unix
520   } while( sched_yield(), 1 );
521 #else
522   } while( SwitchToThread(), 1 );
523 #endif
524 }
525
526 //      try to obtain write lock
527
528 //      return 1 if obtained,
529 //              0 otherwise
530
531 int bt_spinwritetry(BtSpinLatch *latch)
532 {
533 ushort prev;
534
535 #ifdef  unix
536         prev = __sync_fetch_and_or((ushort *)latch, XCL);
537 #else
538         prev = _InterlockedOr16((ushort *)latch, XCL);
539 #endif
540         //      take write access if all bits are clear
541
542         if( !(prev & XCL) )
543           if( !(prev & ~BOTH) )
544                 return 1;
545           else
546 #ifdef unix
547                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
548 #else
549                 _InterlockedAnd16((ushort *)latch, ~XCL);
550 #endif
551         return 0;
552 }
553
554 //      clear write mode
555
556 void bt_spinreleasewrite(BtSpinLatch *latch)
557 {
558 #ifdef unix
559         __sync_fetch_and_and((ushort *)latch, ~BOTH);
560 #else
561         _InterlockedAnd16((ushort *)latch, ~BOTH);
562 #endif
563 }
564
565 //      decrement reader count
566
567 void bt_spinreleaseread(BtSpinLatch *latch)
568 {
569 #ifdef unix
570         __sync_fetch_and_add((ushort *)latch, -SHARE);
571 #else
572         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
573 #endif
574 }
575
576 //      read page from permanent location in Btree file
577
578 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
579 {
580 off64_t off = page_no << mgr->page_bits;
581
582 #ifdef unix
583         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
584                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
585                 return BTERR_read;
586         }
587 #else
588 OVERLAPPED ovl[1];
589 uint amt[1];
590
591         memset (ovl, 0, sizeof(OVERLAPPED));
592         ovl->Offset = off;
593         ovl->OffsetHigh = off >> 32;
594
595         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
596                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
597                 return BTERR_read;
598         }
599         if( *amt <  mgr->page_size ) {
600                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
601                 return BTERR_read;
602         }
603 #endif
604         return 0;
605 }
606
607 //      write page to permanent location in Btree file
608 //      clear the dirty bit
609
610 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
611 {
612 off64_t off = page_no << mgr->page_bits;
613
614 #ifdef unix
615         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
616                 return BTERR_wrt;
617 #else
618 OVERLAPPED ovl[1];
619 uint amt[1];
620
621         memset (ovl, 0, sizeof(OVERLAPPED));
622         ovl->Offset = off;
623         ovl->OffsetHigh = off >> 32;
624
625         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
626                 return BTERR_wrt;
627
628         if( *amt <  mgr->page_size )
629                 return BTERR_wrt;
630 #endif
631         return 0;
632 }
633
634 //      link latch table entry into head of latch hash table
635
636 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
637 {
638 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
639 BtLatchSet *latch = bt->mgr->latchsets + slot;
640
641         if( latch->next = bt->mgr->hashtable[hashidx].slot )
642                 bt->mgr->latchsets[latch->next].prev = slot;
643
644         bt->mgr->hashtable[hashidx].slot = slot;
645         latch->page_no = page_no;
646         latch->slot = slot;
647         latch->prev = 0;
648         latch->pin = 1;
649
650         if( loadit )
651           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
652                 return bt->err;
653           else
654                 bt->reads++;
655
656         return bt->err = 0;
657 }
658
659 //      set CLOCK bit in latch
660 //      decrement pin count
661
662 void bt_unpinlatch (BtLatchSet *latch)
663 {
664 #ifdef unix
665         if( ~latch->pin & CLOCK_bit )
666                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
667         __sync_fetch_and_add(&latch->pin, -1);
668 #else
669         if( ~latch->pin & CLOCK_bit )
670                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
671         _InterlockedDecrement16 (&latch->pin);
672 #endif
673 }
674
675 //  return the btree cached page address
676
677 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
678 {
679 BtPage page = (BtPage)(((uid)latch->slot << bt->mgr->page_bits) + bt->mgr->pagepool);
680
681         return page;
682 }
683
684 //      find existing latchset or inspire new one
685 //      return with latchset pinned
686
687 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
688 {
689 uint hashidx = page_no % bt->mgr->latchhash;
690 BtLatchSet *latch;
691 uint slot, idx;
692 uint lvl, cnt;
693 off64_t off;
694 uint amt[1];
695 BtPage page;
696
697   //  try to find our entry
698
699   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
700
701   if( slot = bt->mgr->hashtable[hashidx].slot ) do
702   {
703         latch = bt->mgr->latchsets + slot;
704         if( page_no == latch->page_no )
705                 break;
706   } while( slot = latch->next );
707
708   //  found our entry
709   //  increment clock
710
711   if( slot ) {
712         latch = bt->mgr->latchsets + slot;
713 #ifdef unix
714         __sync_fetch_and_add(&latch->pin, 1);
715 #else
716         _InterlockedIncrement16 (&latch->pin);
717 #endif
718         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
719         return latch;
720   }
721
722         //  see if there are any unused pool entries
723 #ifdef unix
724         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
725 #else
726         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
727 #endif
728
729         if( slot < bt->mgr->latchtotal ) {
730                 latch = bt->mgr->latchsets + slot;
731                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
732                         return NULL;
733                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
734                 return latch;
735         }
736
737 #ifdef unix
738         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
739 #else
740         _InterlockedDecrement (&bt->mgr->latchdeployed);
741 #endif
742   //  find and reuse previous entry on victim
743
744   while( 1 ) {
745 #ifdef unix
746         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
747 #else
748         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
749 #endif
750         // try to get write lock on hash chain
751         //      skip entry if not obtained
752         //      or has outstanding pins
753
754         slot %= bt->mgr->latchtotal;
755
756         if( !slot )
757                 continue;
758
759         latch = bt->mgr->latchsets + slot;
760         idx = latch->page_no % bt->mgr->latchhash;
761
762         //      see we are on same chain as hashidx
763
764         if( idx == hashidx )
765                 continue;
766
767         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
768                 continue;
769
770         //  skip this slot if it is pinned
771         //      or the CLOCK bit is set
772
773         if( latch->pin ) {
774           if( latch->pin & CLOCK_bit ) {
775 #ifdef unix
776                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
777 #else
778                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
779 #endif
780           }
781           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
782           continue;
783         }
784
785         //  update permanent page area in btree from buffer pool
786
787         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
788
789         if( latch->dirty )
790           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
791                 return NULL;
792           else
793                 latch->dirty = 0, bt->writes++;
794
795         //  unlink our available slot from its hash chain
796
797         if( latch->prev )
798                 bt->mgr->latchsets[latch->prev].next = latch->next;
799         else
800                 bt->mgr->hashtable[idx].slot = latch->next;
801
802         if( latch->next )
803                 bt->mgr->latchsets[latch->next].prev = latch->prev;
804
805         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
806
807         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
808                 return NULL;
809
810         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
811         return latch;
812   }
813 }
814
815 void bt_mgrclose (BtMgr *mgr)
816 {
817 BtLatchSet *latch;
818 uint num = 0;
819 BtPage page;
820 uint slot;
821
822         //      flush dirty pool pages to the btree
823
824         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
825                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
826                 latch = mgr->latchsets + slot;
827
828                 if( latch->dirty ) {
829                         bt_writepage(mgr, page, latch->page_no);
830                         latch->dirty = 0, num++;
831                 }
832 //              madvise (page, mgr->page_size, MADV_DONTNEED);
833         }
834
835         fprintf(stderr, "%d buffer pool pages flushed\n", num);
836
837 #ifdef unix
838         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
839         munmap (mgr->pagezero, mgr->page_size);
840 #else
841         FlushViewOfFile(mgr->pagezero, 0);
842         UnmapViewOfFile(mgr->pagezero);
843         UnmapViewOfFile(mgr->hashtable);
844         CloseHandle(mgr->halloc);
845         CloseHandle(mgr->hpool);
846 #endif
847 #ifdef unix
848         close (mgr->idx);
849         free (mgr);
850 #else
851         FlushFileBuffers(mgr->idx);
852         CloseHandle(mgr->idx);
853         GlobalFree (mgr);
854 #endif
855 }
856
857 //      close and release memory
858
859 void bt_close (BtDb *bt)
860 {
861 #ifdef unix
862         if( bt->mem )
863                 free (bt->mem);
864 #else
865         if( bt->mem)
866                 VirtualFree (bt->mem, 0, MEM_RELEASE);
867 #endif
868         free (bt);
869 }
870
871 //  open/create new btree buffer manager
872
873 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
874 //              size of page pool (e.g. 262144)
875
876 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
877 {
878 uint lvl, attr, last, slot, idx;
879 uint nlatchpage, latchhash;
880 unsigned char value[BtId];
881 int flag, initit = 0;
882 BtPageZero *pagezero;
883 off64_t size;
884 uint amt[1];
885 BtMgr* mgr;
886 BtKey* key;
887 BtVal *val;
888
889         // determine sanity of page size and buffer pool
890
891         if( bits > BT_maxbits )
892                 bits = BT_maxbits;
893         else if( bits < BT_minbits )
894                 bits = BT_minbits;
895
896         if( nodemax < 16 ) {
897                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
898                 return NULL;
899         }
900
901 #ifdef unix
902         mgr = calloc (1, sizeof(BtMgr));
903
904         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
905
906         if( mgr->idx == -1 ) {
907                 fprintf (stderr, "Unable to open btree file\n");
908                 return free(mgr), NULL;
909         }
910 #else
911         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
912         attr = FILE_ATTRIBUTE_NORMAL;
913         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
914
915         if( mgr->idx == INVALID_HANDLE_VALUE )
916                 return GlobalFree(mgr), NULL;
917 #endif
918
919 #ifdef unix
920         pagezero = valloc (BT_maxpage);
921         *amt = 0;
922
923         // read minimum page size to get root info
924         //      to support raw disk partition files
925         //      check if bits == 0 on the disk.
926
927         if( size = lseek (mgr->idx, 0L, 2) )
928                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
929                         if( pagezero->alloc->bits )
930                                 bits = pagezero->alloc->bits;
931                         else
932                                 initit = 1;
933                 else
934                         return free(mgr), free(pagezero), NULL;
935         else
936                 initit = 1;
937 #else
938         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
939         size = GetFileSize(mgr->idx, amt);
940
941         if( size || *amt ) {
942                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
943                         return bt_mgrclose (mgr), NULL;
944                 bits = pagezero->alloc->bits;
945         } else
946                 initit = 1;
947 #endif
948
949         mgr->page_size = 1 << bits;
950         mgr->page_bits = bits;
951
952         //  calculate number of latch hash table entries
953
954         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
955         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
956
957         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
958         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
959         mgr->latchtotal = nodemax;
960
961         if( !initit )
962                 goto mgrlatch;
963
964         // initialize an empty b-tree with latch page, root page, page of leaves
965         // and page(s) of latches and page pool cache
966
967         memset (pagezero, 0, 1 << bits);
968         pagezero->alloc->bits = mgr->page_bits;
969         bt_putid(pagezero->alloc->right, MIN_lvl+1);
970
971         //  initialize left-most LEAF page in
972         //      alloc->left.
973
974         bt_putid (pagezero->alloc->left, LEAF_page);
975
976         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
977                 fprintf (stderr, "Unable to create btree page zero\n");
978                 return bt_mgrclose (mgr), NULL;
979         }
980
981         memset (pagezero, 0, 1 << bits);
982         pagezero->alloc->bits = mgr->page_bits;
983
984         for( lvl=MIN_lvl; lvl--; ) {
985                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
986                 key = keyptr(pagezero->alloc, 1);
987                 key->len = 2;           // create stopper key
988                 key->key[0] = 0xff;
989                 key->key[1] = 0xff;
990
991                 bt_putid(value, MIN_lvl - lvl + 1);
992                 val = valptr(pagezero->alloc, 1);
993                 val->len = lvl ? BtId : 0;
994                 memcpy (val->value, value, val->len);
995
996                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
997                 pagezero->alloc->lvl = lvl;
998                 pagezero->alloc->cnt = 1;
999                 pagezero->alloc->act = 1;
1000
1001                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1002                         fprintf (stderr, "Unable to create btree page zero\n");
1003                         return bt_mgrclose (mgr), NULL;
1004                 }
1005         }
1006
1007 mgrlatch:
1008 #ifdef unix
1009         free (pagezero);
1010 #else
1011         VirtualFree (pagezero, 0, MEM_RELEASE);
1012 #endif
1013 #ifdef unix
1014         // mlock the pagezero page
1015
1016         flag = PROT_READ | PROT_WRITE;
1017         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1018         if( mgr->pagezero == MAP_FAILED ) {
1019                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1020                 return bt_mgrclose (mgr), NULL;
1021         }
1022         mlock (mgr->pagezero, mgr->page_size);
1023
1024         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1025         if( mgr->hashtable == MAP_FAILED ) {
1026                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1027                 return bt_mgrclose (mgr), NULL;
1028         }
1029 #else
1030         flag = PAGE_READWRITE;
1031         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1032         if( !mgr->halloc ) {
1033                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1034                 return bt_mgrclose (mgr), NULL;
1035         }
1036
1037         flag = FILE_MAP_WRITE;
1038         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1039         if( !mgr->pagezero ) {
1040                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1041                 return bt_mgrclose (mgr), NULL;
1042         }
1043
1044         flag = PAGE_READWRITE;
1045         size = (uid)mgr->nlatchpage << mgr->page_bits;
1046         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1047         if( !mgr->hpool ) {
1048                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1049                 return bt_mgrclose (mgr), NULL;
1050         }
1051
1052         flag = FILE_MAP_WRITE;
1053         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1054         if( !mgr->hashtable ) {
1055                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1056                 return bt_mgrclose (mgr), NULL;
1057         }
1058 #endif
1059
1060         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1061         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1062
1063         return mgr;
1064 }
1065
1066 //      open BTree access method
1067 //      based on buffer manager
1068
1069 BtDb *bt_open (BtMgr *mgr)
1070 {
1071 BtDb *bt = malloc (sizeof(*bt));
1072
1073         memset (bt, 0, sizeof(*bt));
1074         bt->mgr = mgr;
1075 #ifdef unix
1076         bt->mem = valloc (2 *mgr->page_size);
1077 #else
1078         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1079 #endif
1080         bt->frame = (BtPage)bt->mem;
1081         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1082         return bt;
1083 }
1084
1085 //  compare two keys, returning > 0, = 0, or < 0
1086 //  as the comparison value
1087
1088 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1089 {
1090 uint len1 = key1->len;
1091 int ans;
1092
1093         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1094                 return ans;
1095
1096         if( len1 > len2 )
1097                 return 1;
1098         if( len1 < len2 )
1099                 return -1;
1100
1101         return 0;
1102 }
1103
1104 // place write, read, or parent lock on requested page_no.
1105
1106 void bt_lockpage(BtLock mode, BtLatchSet *set)
1107 {
1108         switch( mode ) {
1109         case BtLockRead:
1110                 ReadLock (set->readwr);
1111                 break;
1112         case BtLockWrite:
1113                 WriteLock (set->readwr);
1114                 break;
1115         case BtLockAccess:
1116                 ReadLock (set->access);
1117                 break;
1118         case BtLockDelete:
1119                 WriteLock (set->access);
1120                 break;
1121         case BtLockParent:
1122                 WriteLock (set->parent);
1123                 break;
1124         }
1125 }
1126
1127 // remove write, read, or parent lock on requested page
1128
1129 void bt_unlockpage(BtLock mode, BtLatchSet *set)
1130 {
1131         switch( mode ) {
1132         case BtLockRead:
1133                 ReadRelease (set->readwr);
1134                 break;
1135         case BtLockWrite:
1136                 WriteRelease (set->readwr);
1137                 break;
1138         case BtLockAccess:
1139                 ReadRelease (set->access);
1140                 break;
1141         case BtLockDelete:
1142                 WriteRelease (set->access);
1143                 break;
1144         case BtLockParent:
1145                 WriteRelease (set->parent);
1146                 break;
1147         }
1148 }
1149
1150 //      allocate a new page
1151 //      return with page latched.
1152
1153 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1154 {
1155 int blk;
1156
1157         //      lock allocation page
1158
1159         bt_spinwritelock(bt->mgr->lock);
1160
1161         // use empty chain first
1162         // else allocate empty page
1163
1164         if( set->page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1165                 if( set->latch = bt_pinlatch (bt, set->page_no, 1) )
1166                         set->page = bt_mappage (bt, set->latch);
1167                 else
1168                         return bt->err = BTERR_struct, -1;
1169
1170                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1171                 bt_spinreleasewrite(bt->mgr->lock);
1172
1173                 memcpy (set->page, contents, bt->mgr->page_size);
1174                 set->latch->dirty = 1;
1175                 return 0;
1176         }
1177
1178         set->page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1179         bt_putid(bt->mgr->pagezero->alloc->right, set->page_no+1);
1180
1181         // unlock allocation latch
1182
1183         bt_spinreleasewrite(bt->mgr->lock);
1184
1185         //      don't load cache from btree page
1186
1187         if( set->latch = bt_pinlatch (bt, set->page_no, 0) )
1188                 set->page = bt_mappage (bt, set->latch);
1189         else
1190                 return bt->err = BTERR_struct;
1191
1192         memcpy (set->page, contents, bt->mgr->page_size);
1193         set->latch->dirty = 1;
1194         return 0;
1195 }
1196
1197 //  find slot in page for given key at a given level
1198
1199 int bt_findslot (BtPageSet *set, unsigned char *key, uint len)
1200 {
1201 uint diff, higher = set->page->cnt, low = 1, slot;
1202 uint good = 0;
1203
1204         //        make stopper key an infinite fence value
1205
1206         if( bt_getid (set->page->right) )
1207                 higher++;
1208         else
1209                 good++;
1210
1211         //      low is the lowest candidate.
1212         //  loop ends when they meet
1213
1214         //  higher is already
1215         //      tested as .ge. the passed key.
1216
1217         while( diff = higher - low ) {
1218                 slot = low + ( diff >> 1 );
1219                 if( keycmp (keyptr(set->page, slot), key, len) < 0 )
1220                         low = slot + 1;
1221                 else
1222                         higher = slot, good++;
1223         }
1224
1225         //      return zero if key is on right link page
1226
1227         return good ? higher : 0;
1228 }
1229
1230 //  find and load page at given level for given key
1231 //      leave page rd or wr locked as requested
1232
1233 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1234 {
1235 uid page_no = ROOT_page, prevpage = 0;
1236 uint drill = 0xff, slot;
1237 BtLatchSet *prevlatch;
1238 uint mode, prevmode;
1239
1240   //  start at root of btree and drill down
1241
1242   do {
1243         // determine lock mode of drill level
1244         mode = (drill == lvl) ? lock : BtLockRead; 
1245
1246         if( set->latch = bt_pinlatch (bt, page_no, 1) )
1247                 set->page_no = page_no;
1248         else
1249                 return 0;
1250
1251         // obtain access lock using lock chaining with Access mode
1252
1253         if( page_no > ROOT_page )
1254           bt_lockpage(BtLockAccess, set->latch);
1255
1256         set->page = bt_mappage (bt, set->latch);
1257
1258         //      release & unpin parent page
1259
1260         if( prevpage ) {
1261           bt_unlockpage(prevmode, prevlatch);
1262           bt_unpinlatch (prevlatch);
1263           prevpage = 0;
1264         }
1265
1266         // obtain read lock using lock chaining
1267
1268         bt_lockpage(mode, set->latch);
1269
1270         if( set->page->free )
1271                 return bt->err = BTERR_struct, 0;
1272
1273         if( page_no > ROOT_page )
1274           bt_unlockpage(BtLockAccess, set->latch);
1275
1276         // re-read and re-lock root after determining actual level of root
1277
1278         if( set->page->lvl != drill) {
1279                 if( set->page_no != ROOT_page )
1280                         return bt->err = BTERR_struct, 0;
1281                         
1282                 drill = set->page->lvl;
1283
1284                 if( lock != BtLockRead && drill == lvl ) {
1285                   bt_unlockpage(mode, set->latch);
1286                   bt_unpinlatch (set->latch);
1287                   continue;
1288                 }
1289         }
1290
1291         prevpage = set->page_no;
1292         prevlatch = set->latch;
1293         prevmode = mode;
1294
1295         //  find key on page at this level
1296         //  and descend to requested level
1297
1298         if( set->page->kill )
1299           goto slideright;
1300
1301         if( slot = bt_findslot (set, key, len) ) {
1302           if( drill == lvl )
1303                 return slot;
1304
1305           while( slotptr(set->page, slot)->dead )
1306                 if( slot++ < set->page->cnt )
1307                         continue;
1308                 else
1309                         goto slideright;
1310
1311           page_no = bt_getid(valptr(set->page, slot)->value);
1312           drill--;
1313           continue;
1314         }
1315
1316         //  or slide right into next page
1317
1318 slideright:
1319         page_no = bt_getid(set->page->right);
1320
1321   } while( page_no );
1322
1323   // return error on end of right chain
1324
1325   bt->err = BTERR_struct;
1326   return 0;     // return error
1327 }
1328
1329 //      return page to free list
1330 //      page must be delete & write locked
1331
1332 void bt_freepage (BtDb *bt, BtPageSet *set)
1333 {
1334         //      lock allocation page
1335
1336         bt_spinwritelock (bt->mgr->lock);
1337
1338         //      store chain
1339         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1340         bt_putid(bt->mgr->pagezero->chain, set->page_no);
1341         set->latch->dirty = 1;
1342         set->page->free = 1;
1343
1344         // unlock released page
1345
1346         bt_unlockpage (BtLockDelete, set->latch);
1347         bt_unlockpage (BtLockWrite, set->latch);
1348         bt_unpinlatch (set->latch);
1349
1350         // unlock allocation page
1351
1352         bt_spinreleasewrite (bt->mgr->lock);
1353 }
1354
1355 //      a fence key was deleted from a page
1356 //      push new fence value upwards
1357
1358 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1359 {
1360 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1361 unsigned char value[BtId];
1362 BtKey* ptr;
1363 uint idx;
1364
1365         //      remove the old fence value
1366
1367         ptr = keyptr(set->page, set->page->cnt);
1368         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1369         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1370         set->latch->dirty = 1;
1371
1372         //  cache new fence value
1373
1374         ptr = keyptr(set->page, set->page->cnt);
1375         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1376
1377         bt_lockpage (BtLockParent, set->latch);
1378         bt_unlockpage (BtLockWrite, set->latch);
1379
1380         //      insert new (now smaller) fence key
1381
1382         bt_putid (value, set->page_no);
1383         ptr = (BtKey*)leftkey;
1384
1385         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1386           return bt->err;
1387
1388         //      now delete old fence key
1389
1390         ptr = (BtKey*)rightkey;
1391
1392         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1393                 return bt->err;
1394
1395         bt_unlockpage (BtLockParent, set->latch);
1396         bt_unpinlatch(set->latch);
1397         return 0;
1398 }
1399
1400 //      root has a single child
1401 //      collapse a level from the tree
1402
1403 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1404 {
1405 BtPageSet child[1];
1406 uint idx;
1407
1408   // find the child entry and promote as new root contents
1409
1410   do {
1411         for( idx = 0; idx++ < root->page->cnt; )
1412           if( !slotptr(root->page, idx)->dead )
1413                 break;
1414
1415         child->page_no = bt_getid (valptr(root->page, idx)->value);
1416
1417         if( child->latch = bt_pinlatch (bt, child->page_no, 1) )
1418                 child->page = bt_mappage (bt, child->latch);
1419         else
1420                 return bt->err;
1421
1422         bt_lockpage (BtLockDelete, child->latch);
1423         bt_lockpage (BtLockWrite, child->latch);
1424
1425         memcpy (root->page, child->page, bt->mgr->page_size);
1426         root->latch->dirty = 1;
1427
1428         bt_freepage (bt, child);
1429
1430   } while( root->page->lvl > 1 && root->page->act == 1 );
1431
1432   bt_unlockpage (BtLockWrite, root->latch);
1433   bt_unpinlatch (root->latch);
1434   return 0;
1435 }
1436
1437 //      maintain reverse scan pointers by
1438 //      linking left pointer of far right node
1439
1440 BTERR bt_linkleft (BtDb *bt, uid left_page_no, uid right_page_no)
1441 {
1442 BtPageSet right2[1];
1443
1444         //      keep track of rightmost leaf page
1445
1446         if( !right_page_no ) {
1447           bt_putid (bt->mgr->pagezero->alloc->left, left_page_no);
1448           return 0;
1449         }
1450
1451         //      link right page to left page
1452
1453         if( right2->latch = bt_pinlatch (bt, right_page_no, 1) )
1454                 right2->page = bt_mappage (bt, right2->latch);
1455         else
1456                 return bt->err;
1457
1458         bt_lockpage (BtLockWrite, right2->latch);
1459
1460         bt_putid(right2->page->left, left_page_no);
1461         bt_unlockpage (BtLockWrite, right2->latch);
1462         bt_unpinlatch (right2->latch);
1463         return 0;
1464 }
1465
1466 //  find and delete key on page by marking delete flag bit
1467 //  if page becomes empty, delete it from the btree
1468
1469 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1470 {
1471 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1472 uint slot, idx, found, fence;
1473 BtPageSet set[1], right[1];
1474 unsigned char value[BtId];
1475 BtKey *ptr, *tst;
1476 BtVal *val;
1477
1478         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1479                 ptr = keyptr(set->page, slot);
1480         else
1481                 return bt->err;
1482
1483         // if librarian slot, advance to real slot
1484
1485         if( slotptr(set->page, slot)->type == Librarian )
1486                 ptr = keyptr(set->page, ++slot);
1487
1488         fence = slot == set->page->cnt;
1489
1490         // if key is found delete it, otherwise ignore request
1491
1492         if( found = !keycmp (ptr, key, len) )
1493           if( found = slotptr(set->page, slot)->dead == 0 ) {
1494                 val = valptr(set->page,slot);
1495                 slotptr(set->page, slot)->dead = 1;
1496                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1497                 set->page->act--;
1498
1499                 // collapse empty slots beneath the fence
1500
1501                 while( idx = set->page->cnt - 1 )
1502                   if( slotptr(set->page, idx)->dead ) {
1503                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1504                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1505                   } else
1506                         break;
1507           }
1508
1509         //      did we delete a fence key in an upper level?
1510
1511         if( found && lvl && set->page->act && fence )
1512           if( bt_fixfence (bt, set, lvl) )
1513                 return bt->err;
1514           else
1515                 return bt->found = found, 0;
1516
1517         //      do we need to collapse root?
1518
1519         if( lvl > 1 && set->page_no == ROOT_page && set->page->act == 1 )
1520           if( bt_collapseroot (bt, set) )
1521                 return bt->err;
1522           else
1523                 return bt->found = found, 0;
1524
1525         //      return if page is not empty
1526
1527         if( set->page->act ) {
1528                 set->latch->dirty = 1;
1529                 bt_unlockpage(BtLockWrite, set->latch);
1530                 bt_unpinlatch (set->latch);
1531                 return bt->found = found, 0;
1532         }
1533
1534         //      cache copy of fence key
1535         //      to post in parent
1536
1537         ptr = keyptr(set->page, set->page->cnt);
1538         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1539
1540         //      obtain lock on right page
1541
1542         right->page_no = bt_getid(set->page->right);
1543
1544         if( right->latch = bt_pinlatch (bt, right->page_no, 1) )
1545                 right->page = bt_mappage (bt, right->latch);
1546         else
1547                 return 0;
1548
1549         bt_lockpage (BtLockWrite, right->latch);
1550
1551         if( right->page->kill )
1552                 return bt->err = BTERR_struct;
1553
1554         // transfer left link
1555
1556         memcpy (right->page->left, set->page->left, BtId);
1557
1558         // pull contents of right peer into our empty page
1559
1560         memcpy (set->page, right->page, bt->mgr->page_size);
1561         set->latch->dirty = 1;
1562
1563         // update left link
1564
1565         if( !lvl )
1566           if( bt_linkleft (bt, set->page_no, bt_getid (set->page->right)) )
1567                 return bt->err;
1568
1569         // cache copy of key to update
1570
1571         ptr = keyptr(right->page, right->page->cnt);
1572         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1573
1574         // mark right page deleted and point it to left page
1575         //      until we can post parent updates
1576
1577         bt_putid (right->page->right, set->page_no);
1578         right->latch->dirty = 1;
1579         right->page->kill = 1;
1580
1581         bt_lockpage (BtLockParent, right->latch);
1582         bt_unlockpage (BtLockWrite, right->latch);
1583
1584         bt_lockpage (BtLockParent, set->latch);
1585         bt_unlockpage (BtLockWrite, set->latch);
1586
1587         // redirect higher key directly to our new node contents
1588
1589         bt_putid (value, set->page_no);
1590         ptr = (BtKey*)higherfence;
1591
1592         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1593           return bt->err;
1594
1595         //      delete old lower key to our node
1596
1597         ptr = (BtKey*)lowerfence;
1598
1599         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1600           return bt->err;
1601
1602         //      obtain delete and write locks to right node
1603
1604         bt_unlockpage (BtLockParent, right->latch);
1605         bt_lockpage (BtLockDelete, right->latch);
1606         bt_lockpage (BtLockWrite, right->latch);
1607         bt_freepage (bt, right);
1608
1609         bt_unlockpage (BtLockParent, set->latch);
1610         bt_unpinlatch (set->latch);
1611         bt->found = found;
1612         return 0;
1613 }
1614
1615 BtKey *bt_foundkey (BtDb *bt)
1616 {
1617         return (BtKey*)(bt->key);
1618 }
1619
1620 //      advance to next slot
1621
1622 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1623 {
1624 BtLatchSet *prevlatch;
1625 uid page_no;
1626
1627         if( slot < set->page->cnt )
1628                 return slot + 1;
1629
1630         prevlatch = set->latch;
1631
1632         if( page_no = bt_getid(set->page->right) )
1633           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1634                 set->page = bt_mappage (bt, set->latch);
1635           else
1636                 return 0;
1637         else
1638           return bt->err = BTERR_struct, 0;
1639
1640         // obtain access lock using lock chaining with Access mode
1641
1642         bt_lockpage(BtLockAccess, set->latch);
1643
1644         bt_unlockpage(BtLockRead, prevlatch);
1645         bt_unpinlatch (prevlatch);
1646
1647         bt_lockpage(BtLockRead, set->latch);
1648         bt_unlockpage(BtLockAccess, set->latch);
1649
1650         set->page_no = page_no;
1651         return 1;
1652 }
1653
1654 //      find unique key or first duplicate key in
1655 //      leaf level and return number of value bytes
1656 //      or (-1) if not found.  Setup key for bt_foundkey
1657
1658 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1659 {
1660 BtPageSet set[1];
1661 uint len, slot;
1662 int ret = -1;
1663 BtKey *ptr;
1664 BtVal *val;
1665
1666   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1667    do {
1668         ptr = keyptr(set->page, slot);
1669
1670         //      skip librarian slot place holder
1671
1672         if( slotptr(set->page, slot)->type == Librarian )
1673                 ptr = keyptr(set->page, ++slot);
1674
1675         //      return actual key found
1676
1677         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1678         len = ptr->len;
1679
1680         if( slotptr(set->page, slot)->type == Duplicate )
1681                 len -= BtId;
1682
1683         //      not there if we reach the stopper key
1684
1685         if( slot == set->page->cnt )
1686           if( !bt_getid (set->page->right) )
1687                 break;
1688
1689         // if key exists, return >= 0 value bytes copied
1690         //      otherwise return (-1)
1691
1692         if( slotptr(set->page, slot)->dead )
1693                 continue;
1694
1695         if( keylen == len )
1696           if( !memcmp (ptr->key, key, len) ) {
1697                 val = valptr (set->page,slot);
1698                 if( valmax > val->len )
1699                         valmax = val->len;
1700                 memcpy (value, val->value, valmax);
1701                 ret = valmax;
1702           }
1703
1704         break;
1705
1706    } while( slot = bt_findnext (bt, set, slot) );
1707
1708   bt_unlockpage (BtLockRead, set->latch);
1709   bt_unpinlatch (set->latch);
1710   return ret;
1711 }
1712
1713 //      check page for space available,
1714 //      clean if necessary and return
1715 //      0 - page needs splitting
1716 //      >0  new slot value
1717
1718 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1719 {
1720 uint nxt = bt->mgr->page_size;
1721 BtPage page = set->page;
1722 uint cnt = 0, idx = 0;
1723 uint max = page->cnt;
1724 uint newslot = max;
1725 BtKey *key;
1726 BtVal *val;
1727
1728         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1729                 return slot;
1730
1731         //      skip cleanup and proceed to split
1732         //      if there's not enough garbage
1733         //      to bother with.
1734
1735         if( page->garbage < nxt / 5 )
1736                 return 0;
1737
1738         memcpy (bt->frame, page, bt->mgr->page_size);
1739
1740         // skip page info and set rest of page to zero
1741
1742         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1743         set->latch->dirty = 1;
1744         page->garbage = 0;
1745         page->act = 0;
1746
1747         // clean up page first by
1748         // removing deleted keys
1749
1750         while( cnt++ < max ) {
1751                 if( cnt == slot )
1752                         newslot = idx + 2;
1753                 if( cnt < max && slotptr(bt->frame,cnt)->dead )
1754                         continue;
1755
1756                 // copy the value across
1757
1758                 val = valptr(bt->frame, cnt);
1759                 nxt -= val->len + sizeof(BtVal);
1760                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1761
1762                 // copy the key across
1763
1764                 key = keyptr(bt->frame, cnt);
1765                 nxt -= key->len + sizeof(BtKey);
1766                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1767
1768                 // make a librarian slot
1769
1770                 if( idx ) {
1771                         slotptr(page, ++idx)->off = nxt;
1772                         slotptr(page, idx)->type = Librarian;
1773                         slotptr(page, idx)->dead = 1;
1774                 }
1775
1776                 // set up the slot
1777
1778                 slotptr(page, ++idx)->off = nxt;
1779                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1780
1781                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1782                         page->act++;
1783         }
1784
1785         page->min = nxt;
1786         page->cnt = idx;
1787
1788         //      see if page has enough space now, or does it need splitting?
1789
1790         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1791                 return newslot;
1792
1793         return 0;
1794 }
1795
1796 // split the root and raise the height of the btree
1797
1798 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtKey *leftkey, BtPageSet *right)
1799 {
1800 uint nxt = bt->mgr->page_size;
1801 unsigned char value[BtId];
1802 BtPageSet left[1];
1803 BtKey *ptr;
1804 BtVal *val;
1805
1806         //  Obtain an empty page to use, and copy the current
1807         //  root contents into it, e.g. lower keys
1808
1809         if( bt_newpage(bt, left, root->page) )
1810                 return bt->err;
1811
1812         bt_unpinlatch (left->latch);
1813
1814         // set left from higher to lower page
1815
1816         bt_putid (right->page->left, left->page_no);
1817         bt_unpinlatch (right->latch);
1818
1819         // preserve the page info at the bottom
1820         // of higher keys and set rest to zero
1821
1822         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1823
1824         // insert stopper key at top of newroot page
1825         // and increase the root height
1826
1827         nxt -= BtId + sizeof(BtVal);
1828         bt_putid (value, right->page_no);
1829         val = (BtVal *)((unsigned char *)root->page + nxt);
1830         memcpy (val->value, value, BtId);
1831         val->len = BtId;
1832
1833         nxt -= 2 + sizeof(BtKey);
1834         slotptr(root->page, 2)->off = nxt;
1835         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1836         ptr->len = 2;
1837         ptr->key[0] = 0xff;
1838         ptr->key[1] = 0xff;
1839
1840         // insert lower keys page fence key on newroot page as first key
1841
1842         nxt -= BtId + sizeof(BtVal);
1843         bt_putid (value, left->page_no);
1844         val = (BtVal *)((unsigned char *)root->page + nxt);
1845         memcpy (val->value, value, BtId);
1846         val->len = BtId;
1847
1848         nxt -= leftkey->len + sizeof(BtKey);
1849         slotptr(root->page, 1)->off = nxt;
1850         memcpy ((unsigned char *)root->page + nxt, leftkey, leftkey->len + sizeof(BtKey));
1851         
1852         bt_putid(root->page->right, 0);
1853         root->page->min = nxt;          // reset lowest used offset and key count
1854         root->page->cnt = 2;
1855         root->page->act = 2;
1856         root->page->lvl++;
1857
1858         // release and unpin root pages
1859
1860         bt_unlockpage(BtLockWrite, root->latch);
1861         bt_unpinlatch (root->latch);
1862         return 0;
1863 }
1864
1865 //  split already locked full node
1866 //      return unlocked.
1867
1868 BTERR bt_splitpage (BtDb *bt, BtPageSet *set)
1869 {
1870 unsigned char fencekey[BT_keyarray], rightkey[BT_keyarray];
1871 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1872 unsigned char value[BtId];
1873 uint lvl = set->page->lvl;
1874 BtPageSet right[1];
1875 BtKey *key, *ptr;
1876 BtVal *val, *src;
1877 uid right2;
1878 uint prev;
1879
1880         //  split higher half of keys to bt->frame
1881
1882         memset (bt->frame, 0, bt->mgr->page_size);
1883         max = set->page->cnt;
1884         cnt = max / 2;
1885         idx = 0;
1886
1887         while( cnt++ < max ) {
1888                 if( slotptr(set->page, cnt)->dead && cnt < max )
1889                         continue;
1890                 src = valptr(set->page, cnt);
1891                 nxt -= src->len + sizeof(BtVal);
1892                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1893
1894                 key = keyptr(set->page, cnt);
1895                 nxt -= key->len + sizeof(BtKey);
1896                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1897                 memcpy (ptr, key, key->len + sizeof(BtKey));
1898
1899                 //      add librarian slot
1900
1901                 if( idx ) {
1902                         slotptr(bt->frame, ++idx)->off = nxt;
1903                         slotptr(bt->frame, idx)->type = Librarian;
1904                         slotptr(bt->frame, idx)->dead = 1;
1905                 }
1906
1907                 //  add actual slot
1908
1909                 slotptr(bt->frame, ++idx)->off = nxt;
1910                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1911
1912                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1913                         bt->frame->act++;
1914         }
1915
1916         // remember existing fence key for new page to the right
1917
1918         memcpy (rightkey, key, key->len + sizeof(BtKey));
1919
1920         bt->frame->bits = bt->mgr->page_bits;
1921         bt->frame->min = nxt;
1922         bt->frame->cnt = idx;
1923         bt->frame->lvl = lvl;
1924
1925         // link right node
1926
1927         if( set->page_no > ROOT_page ) {
1928                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1929                 bt_putid(bt->frame->left, set->page_no);
1930         }
1931
1932         //      get new free page and write higher keys to it.
1933
1934         if( bt_newpage(bt, right, bt->frame) )
1935                 return bt->err;
1936
1937         // link left node
1938
1939         if( set->page_no > ROOT_page && !lvl )
1940           if( bt_linkleft (bt, right->page_no, bt_getid (set->page->right)) )
1941                 return bt->err;
1942
1943         //      update lower keys to continue in old page
1944
1945         memcpy (bt->frame, set->page, bt->mgr->page_size);
1946         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1947         set->latch->dirty = 1;
1948
1949         nxt = bt->mgr->page_size;
1950         set->page->garbage = 0;
1951         set->page->act = 0;
1952         max /= 2;
1953         cnt = 0;
1954         idx = 0;
1955
1956         if( slotptr(bt->frame, max)->type == Librarian )
1957                 max--;
1958
1959         //  assemble page of smaller keys
1960
1961         while( cnt++ < max ) {
1962                 if( slotptr(bt->frame, cnt)->dead )
1963                         continue;
1964                 val = valptr(bt->frame, cnt);
1965                 nxt -= val->len + sizeof(BtVal);
1966                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1967
1968                 key = keyptr(bt->frame, cnt);
1969                 nxt -= key->len + sizeof(BtKey);
1970                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1971
1972                 //      add librarian slot
1973
1974                 if( idx ) {
1975                         slotptr(set->page, ++idx)->off = nxt;
1976                         slotptr(set->page, idx)->type = Librarian;
1977                         slotptr(set->page, idx)->dead = 1;
1978                 }
1979
1980                 //      add actual slot
1981
1982                 slotptr(set->page, ++idx)->off = nxt;
1983                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
1984                 set->page->act++;
1985         }
1986
1987         // remember fence key for smaller page
1988
1989         memcpy(fencekey, key, key->len + sizeof(BtKey));
1990
1991         bt_putid(set->page->right, right->page_no);
1992         set->page->min = nxt;
1993         set->page->cnt = idx;
1994
1995         // if current page is the root page, split it
1996
1997         if( set->page_no == ROOT_page )
1998                 return bt_splitroot (bt, set, (BtKey *)fencekey, right);
1999
2000         // insert new fences in their parent pages
2001
2002         bt_lockpage (BtLockParent, right->latch);
2003
2004         bt_lockpage (BtLockParent, set->latch);
2005         bt_unlockpage (BtLockWrite, set->latch);
2006
2007         // insert new fence for reformulated left block of smaller keys
2008
2009         ptr = (BtKey*)fencekey;
2010         bt_putid (value, set->page_no);
2011
2012         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2013                 return bt->err;
2014
2015         // switch fence for right block of larger keys to new right page
2016
2017         ptr = (BtKey*)rightkey;
2018         bt_putid (value, right->page_no);
2019
2020         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2021                 return bt->err;
2022
2023         bt_unlockpage (BtLockParent, set->latch);
2024         bt_unpinlatch (set->latch);
2025
2026         bt_unlockpage (BtLockParent, right->latch);
2027         bt_unpinlatch (right->latch);
2028         return 0;
2029 }
2030
2031 //      install new key and value onto page
2032 //      page must already be checked for
2033 //      adequate space
2034
2035 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2036 {
2037 uint idx, librarian;
2038 BtSlot *node;
2039 BtKey *ptr;
2040 BtVal *val;
2041
2042         //      if found slot > desired slot and previous slot
2043         //      is a librarian slot, use it
2044
2045         if( slot > 1 )
2046           if( slotptr(set->page, slot-1)->type == Librarian )
2047                 slot--;
2048
2049         // copy value onto page
2050
2051         set->page->min -= vallen + sizeof(BtVal);
2052         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2053         memcpy (val->value, value, vallen);
2054         val->len = vallen;
2055
2056         // copy key onto page
2057
2058         set->page->min -= keylen + sizeof(BtKey);
2059         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2060         memcpy (ptr->key, key, keylen);
2061         ptr->len = keylen;
2062         
2063         //      find first empty slot
2064
2065         for( idx = slot; idx < set->page->cnt; idx++ )
2066           if( slotptr(set->page, idx)->dead )
2067                 break;
2068
2069         // now insert key into array before slot
2070
2071         if( idx == set->page->cnt )
2072                 idx += 2, set->page->cnt += 2, librarian = 2;
2073         else
2074                 librarian = 1;
2075
2076         set->latch->dirty = 1;
2077         set->page->act++;
2078
2079         while( idx > slot + librarian - 1 )
2080                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2081
2082         //      add librarian slot
2083
2084         if( librarian > 1 ) {
2085                 node = slotptr(set->page, slot++);
2086                 node->off = set->page->min;
2087                 node->type = Librarian;
2088                 node->dead = 1;
2089         }
2090
2091         //      fill in new slot
2092
2093         node = slotptr(set->page, slot);
2094         node->off = set->page->min;
2095         node->type = type;
2096         node->dead = 0;
2097
2098         bt_unlockpage (BtLockWrite, set->latch);
2099         bt_unpinlatch (set->latch);
2100         return 0;
2101 }
2102
2103 //  Insert new key into the btree at given level.
2104 //      either add a new key or update/add an existing one
2105
2106 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2107 {
2108 unsigned char newkey[BT_keyarray];
2109 uint slot, idx, len;
2110 BtPageSet set[1];
2111 BtKey *ptr, *ins;
2112 uid sequence;
2113 BtVal *val;
2114 uint type;
2115
2116   // set up the key we're working on
2117
2118   ins = (BtKey*)newkey;
2119   memcpy (ins->key, key, keylen);
2120   ins->len = keylen;
2121
2122   // is this a non-unique index value?
2123
2124   if( unique )
2125         type = Unique;
2126   else {
2127         type = Duplicate;
2128         sequence = bt_newdup (bt);
2129         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2130         ins->len += BtId;
2131   }
2132
2133   while( 1 ) { // find the page and slot for the current key
2134         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2135                 ptr = keyptr(set->page, slot);
2136         else {
2137                 if( !bt->err )
2138                         bt->err = BTERR_ovflw;
2139                 return bt->err;
2140         }
2141
2142         // if librarian slot == found slot, advance to real slot
2143
2144         if( slotptr(set->page, slot)->type == Librarian )
2145           if( !keycmp (ptr, key, keylen) )
2146                 ptr = keyptr(set->page, ++slot);
2147
2148         len = ptr->len;
2149
2150         if( slotptr(set->page, slot)->type == Duplicate )
2151                 len -= BtId;
2152
2153         //  if inserting a duplicate key or unique key
2154         //      check for adequate space on the page
2155         //      and insert the new key before slot.
2156
2157         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2158           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2159                 if( bt_splitpage (bt, set) )
2160                   return bt->err;
2161                 else
2162                   continue;
2163
2164           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type);
2165         }
2166
2167         // if key already exists, update value and return
2168
2169         val = valptr(set->page, slot);
2170
2171         if( val->len >= vallen ) {
2172                 if( slotptr(set->page, slot)->dead )
2173                         set->page->act++;
2174                 set->page->garbage += val->len - vallen;
2175                 set->latch->dirty = 1;
2176                 slotptr(set->page, slot)->dead = 0;
2177                 val->len = vallen;
2178                 memcpy (val->value, value, vallen);
2179                 bt_unlockpage(BtLockWrite, set->latch);
2180                 bt_unpinlatch (set->latch);
2181                 return 0;
2182         }
2183
2184         //      new update value doesn't fit in existing value area
2185
2186         if( !slotptr(set->page, slot)->dead )
2187                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2188         else {
2189                 slotptr(set->page, slot)->dead = 0;
2190                 set->page->act++;
2191         }
2192
2193         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2194           if( bt_splitpage (bt, set) )
2195                 return bt->err;
2196           else
2197                 continue;
2198
2199         set->page->min -= vallen + sizeof(BtVal);
2200         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2201         memcpy (val->value, value, vallen);
2202         val->len = vallen;
2203
2204         set->latch->dirty = 1;
2205         set->page->min -= keylen + sizeof(BtKey);
2206         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2207         memcpy (ptr->key, key, keylen);
2208         ptr->len = keylen;
2209         
2210         slotptr(set->page, slot)->off = set->page->min;
2211         bt_unlockpage(BtLockWrite, set->latch);
2212         bt_unpinlatch (set->latch);
2213         return 0;
2214   }
2215   return 0;
2216 }
2217
2218 //      set cursor to highest slot on highest page
2219
2220 uint bt_lastkey (BtDb *bt)
2221 {
2222 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2223 BtPageSet set[1];
2224 uint slot;
2225
2226         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2227                 set->page = bt_mappage (bt, set->latch);
2228         else
2229                 return 0;
2230
2231     bt_lockpage(BtLockRead, set->latch);
2232
2233         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2234         slot = set->page->cnt;
2235
2236     bt_unlockpage(BtLockRead, set->latch);
2237         bt_unpinlatch (set->latch);
2238         return slot;
2239 }
2240
2241 //      return previous slot on cursor page
2242
2243 uint bt_prevkey (BtDb *bt, uint slot)
2244 {
2245 BtPageSet set[1];
2246 uid left;
2247
2248         if( --slot )
2249                 return slot;
2250
2251         if( left = bt_getid(bt->cursor->left) )
2252                 bt->cursor_page = left;
2253         else
2254                 return 0;
2255
2256         if( set->latch = bt_pinlatch (bt, left, 1) )
2257                 set->page = bt_mappage (bt, set->latch);
2258         else
2259                 return 0;
2260
2261     bt_lockpage(BtLockRead, set->latch);
2262         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2263         bt_unlockpage(BtLockRead, set->latch);
2264         bt_unpinlatch (set->latch);
2265         return bt->cursor->cnt;
2266 }
2267
2268 //  return next slot on cursor page
2269 //  or slide cursor right into next page
2270
2271 uint bt_nextkey (BtDb *bt, uint slot)
2272 {
2273 BtPageSet set[1];
2274 uid right;
2275
2276   do {
2277         right = bt_getid(bt->cursor->right);
2278
2279         while( slot++ < bt->cursor->cnt )
2280           if( slotptr(bt->cursor,slot)->dead )
2281                 continue;
2282           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2283                 return slot;
2284           else
2285                 break;
2286
2287         if( !right )
2288                 break;
2289
2290         bt->cursor_page = right;
2291
2292         if( set->latch = bt_pinlatch (bt, right, 1) )
2293                 set->page = bt_mappage (bt, set->latch);
2294         else
2295                 return 0;
2296
2297     bt_lockpage(BtLockRead, set->latch);
2298
2299         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2300
2301         bt_unlockpage(BtLockRead, set->latch);
2302         bt_unpinlatch (set->latch);
2303         slot = 0;
2304
2305   } while( 1 );
2306
2307   return bt->err = 0;
2308 }
2309
2310 //  cache page of keys into cursor and return starting slot for given key
2311
2312 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
2313 {
2314 BtPageSet set[1];
2315 uint slot;
2316
2317         // cache page for retrieval
2318
2319         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
2320           memcpy (bt->cursor, set->page, bt->mgr->page_size);
2321         else
2322           return 0;
2323
2324         bt->cursor_page = set->page_no;
2325
2326         bt_unlockpage(BtLockRead, set->latch);
2327         bt_unpinlatch (set->latch);
2328         return slot;
2329 }
2330
2331 BtKey *bt_key(BtDb *bt, uint slot)
2332 {
2333         return keyptr(bt->cursor, slot);
2334 }
2335
2336 BtVal *bt_val(BtDb *bt, uint slot)
2337 {
2338         return valptr(bt->cursor,slot);
2339 }
2340
2341 #ifdef STANDALONE
2342
2343 #ifndef unix
2344 double getCpuTime(int type)
2345 {
2346 FILETIME crtime[1];
2347 FILETIME xittime[1];
2348 FILETIME systime[1];
2349 FILETIME usrtime[1];
2350 SYSTEMTIME timeconv[1];
2351 double ans = 0;
2352
2353         memset (timeconv, 0, sizeof(SYSTEMTIME));
2354
2355         switch( type ) {
2356         case 0:
2357                 GetSystemTimeAsFileTime (xittime);
2358                 FileTimeToSystemTime (xittime, timeconv);
2359                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
2360                 break;
2361         case 1:
2362                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2363                 FileTimeToSystemTime (usrtime, timeconv);
2364                 break;
2365         case 2:
2366                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
2367                 FileTimeToSystemTime (systime, timeconv);
2368                 break;
2369         }
2370
2371         ans += (double)timeconv->wHour * 3600;
2372         ans += (double)timeconv->wMinute * 60;
2373         ans += (double)timeconv->wSecond;
2374         ans += (double)timeconv->wMilliseconds / 1000;
2375         return ans;
2376 }
2377 #else
2378 #include <time.h>
2379 #include <sys/resource.h>
2380
2381 double getCpuTime(int type)
2382 {
2383 struct rusage used[1];
2384 struct timeval tv[1];
2385
2386         switch( type ) {
2387         case 0:
2388                 gettimeofday(tv, NULL);
2389                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
2390
2391         case 1:
2392                 getrusage(RUSAGE_SELF, used);
2393                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
2394
2395         case 2:
2396                 getrusage(RUSAGE_SELF, used);
2397                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
2398         }
2399
2400         return 0;
2401 }
2402 #endif
2403
2404 void bt_poolaudit (BtMgr *mgr)
2405 {
2406 BtLatchSet *latch;
2407 uint slot = 0;
2408
2409         while( slot++ < mgr->latchdeployed ) {
2410                 latch = mgr->latchsets + slot;
2411
2412                 if( *latch->readwr->rin & MASK )
2413                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
2414                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2415
2416                 if( *latch->access->rin & MASK )
2417                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
2418                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2419
2420                 if( *latch->parent->rin & MASK )
2421                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
2422                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2423
2424                 if( latch->pin & ~CLOCK_bit ) {
2425                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
2426                         latch->pin = 0;
2427                 }
2428         }
2429 }
2430
2431 uint bt_latchaudit (BtDb *bt)
2432 {
2433 ushort idx, hashidx;
2434 uid next, page_no;
2435 BtLatchSet *latch;
2436 uint cnt = 0;
2437 BtKey *ptr;
2438
2439         if( *(ushort *)(bt->mgr->lock) )
2440                 fprintf(stderr, "Alloc page locked\n");
2441         *(ushort *)(bt->mgr->lock) = 0;
2442
2443         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
2444                 latch = bt->mgr->latchsets + idx;
2445                 if( *latch->readwr->rin & MASK )
2446                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
2447                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
2448
2449                 if( *latch->access->rin & MASK )
2450                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
2451                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
2452
2453                 if( *latch->parent->rin & MASK )
2454                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
2455                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
2456
2457                 if( latch->pin ) {
2458                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2459                         latch->pin = 0;
2460                 }
2461         }
2462
2463         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
2464           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
2465                         fprintf(stderr, "hash entry %d locked\n", hashidx);
2466
2467           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
2468
2469           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
2470                 latch = bt->mgr->latchsets + idx;
2471                 if( latch->pin )
2472                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
2473           } while( idx = latch->next );
2474         }
2475
2476         page_no = LEAF_page;
2477
2478         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2479         uid off = page_no << bt->mgr->page_bits;
2480 #ifdef unix
2481           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
2482 #else
2483         DWORD amt[1];
2484
2485           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
2486
2487           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
2488                 return bt->err = BTERR_map;
2489
2490           if( *amt <  bt->mgr->page_size )
2491                 return bt->err = BTERR_map;
2492 #endif
2493                 if( !bt->frame->free && !bt->frame->lvl )
2494                         cnt += bt->frame->act;
2495                 page_no++;
2496         }
2497                 
2498         cnt--;  // remove stopper key
2499         fprintf(stderr, " Total keys read %d\n", cnt);
2500
2501         bt_close (bt);
2502         return 0;
2503 }
2504
2505 typedef struct {
2506         char idx;
2507         char *type;
2508         char *infile;
2509         BtMgr *mgr;
2510         int num;
2511 } ThreadArg;
2512
2513 //  standalone program to index file of keys
2514 //  then list them onto std-out
2515
2516 #ifdef unix
2517 void *index_file (void *arg)
2518 #else
2519 uint __stdcall index_file (void *arg)
2520 #endif
2521 {
2522 int line = 0, found = 0, cnt = 0, unique;
2523 uid next, page_no = LEAF_page;  // start on first page of leaves
2524 unsigned char key[BT_maxkey];
2525 ThreadArg *args = arg;
2526 int ch, len = 0, slot;
2527 BtPageSet set[1];
2528 BtKey *ptr;
2529 BtVal *val;
2530 BtDb *bt;
2531 FILE *in;
2532
2533         bt = bt_open (args->mgr);
2534
2535         unique = (args->type[1] | 0x20) != 'd';
2536
2537         switch(args->type[0] | 0x20)
2538         {
2539         case 'a':
2540                 fprintf(stderr, "started latch mgr audit\n");
2541                 cnt = bt_latchaudit (bt);
2542                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
2543                 break;
2544
2545         case 'p':
2546                 fprintf(stderr, "started pennysort for %s\n", args->infile);
2547                 if( in = fopen (args->infile, "rb") )
2548                   while( ch = getc(in), ch != EOF )
2549                         if( ch == '\n' )
2550                         {
2551                           line++;
2552
2553                           if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, unique) )
2554                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2555                           len = 0;
2556                         }
2557                         else if( len < BT_maxkey )
2558                                 key[len++] = ch;
2559                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2560                 break;
2561
2562         case 'w':
2563                 fprintf(stderr, "started indexing for %s\n", args->infile);
2564                 if( in = fopen (args->infile, "rb") )
2565                   while( ch = getc(in), ch != EOF )
2566                         if( ch == '\n' )
2567                         {
2568                           line++;
2569
2570                           if( args->num == 1 )
2571                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2572
2573                           else if( args->num )
2574                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2575
2576                           if( bt_insertkey (bt, key, len, 0, NULL, 0, unique) )
2577                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2578                           len = 0;
2579                         }
2580                         else if( len < BT_maxkey )
2581                                 key[len++] = ch;
2582                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
2583                 break;
2584
2585         case 'd':
2586                 fprintf(stderr, "started deleting keys for %s\n", args->infile);
2587                 if( in = fopen (args->infile, "rb") )
2588                   while( ch = getc(in), ch != EOF )
2589                         if( ch == '\n' )
2590                         {
2591                           line++;
2592                           if( args->num == 1 )
2593                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2594
2595                           else if( args->num )
2596                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2597
2598                           if( bt_findkey (bt, key, len, NULL, 0) < 0 )
2599                                 fprintf(stderr, "Cannot find key for Line: %d\n", line), exit(0);
2600                           ptr = (BtKey*)(bt->key);
2601                           found++;
2602
2603                           if( bt_deletekey (bt, ptr->key, ptr->len, 0) )
2604                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
2605                           len = 0;
2606                         }
2607                         else if( len < BT_maxkey )
2608                                 key[len++] = ch;
2609                 fprintf(stderr, "finished %s for %d keys, %d found: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2610                 break;
2611
2612         case 'f':
2613                 fprintf(stderr, "started finding keys for %s\n", args->infile);
2614                 if( in = fopen (args->infile, "rb") )
2615                   while( ch = getc(in), ch != EOF )
2616                         if( ch == '\n' )
2617                         {
2618                           line++;
2619                           if( args->num == 1 )
2620                                 sprintf((char *)key+len, "%.9d", 1000000000 - line), len += 9;
2621
2622                           else if( args->num )
2623                                 sprintf((char *)key+len, "%.9d", line + args->idx * args->num), len += 9;
2624
2625                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
2626                                 found++;
2627                           else if( bt->err )
2628                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
2629                           len = 0;
2630                         }
2631                         else if( len < BT_maxkey )
2632                                 key[len++] = ch;
2633                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
2634                 break;
2635
2636         case 's':
2637                 fprintf(stderr, "started scanning\n");
2638                 do {
2639                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2640                                 set->page = bt_mappage (bt, set->latch);
2641                         else
2642                                 fprintf(stderr, "unable to obtain latch"), exit(1);
2643                         bt_lockpage (BtLockRead, set->latch);
2644                         next = bt_getid (set->page->right);
2645
2646                         for( slot = 0; slot++ < set->page->cnt; )
2647                          if( next || slot < set->page->cnt )
2648                           if( !slotptr(set->page, slot)->dead ) {
2649                                 ptr = keyptr(set->page, slot);
2650                                 len = ptr->len;
2651
2652                             if( slotptr(set->page, slot)->type == Duplicate )
2653                                         len -= BtId;
2654
2655                                 fwrite (ptr->key, len, 1, stdout);
2656                                 val = valptr(set->page, slot);
2657                                 fwrite (val->value, val->len, 1, stdout);
2658                                 fputc ('\n', stdout);
2659                                 cnt++;
2660                            }
2661
2662                         bt_unlockpage (BtLockRead, set->latch);
2663                         bt_unpinlatch (set->latch);
2664                 } while( page_no = next );
2665
2666                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2667                 break;
2668
2669         case 'r':
2670                 fprintf(stderr, "started reverse scan\n");
2671                 if( slot = bt_lastkey (bt) )
2672                    while( slot = bt_prevkey (bt, slot) ) {
2673                         if( slotptr(bt->cursor, slot)->dead )
2674                           continue;
2675
2676                         ptr = keyptr(bt->cursor, slot);
2677                         len = ptr->len;
2678
2679                         if( slotptr(bt->cursor, slot)->type == Duplicate )
2680                                 len -= BtId;
2681
2682                         fwrite (ptr->key, len, 1, stdout);
2683                         val = valptr(bt->cursor, slot);
2684                         fwrite (val->value, val->len, 1, stdout);
2685                         fputc ('\n', stdout);
2686                         cnt++;
2687                   }
2688
2689                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2690                 break;
2691
2692         case 'c':
2693 #ifdef unix
2694                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
2695 #endif
2696                 fprintf(stderr, "started counting\n");
2697                 page_no = LEAF_page;
2698
2699                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
2700                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
2701                                 break;
2702
2703                         if( !bt->frame->free && !bt->frame->lvl )
2704                                 cnt += bt->frame->act;
2705
2706                         bt->reads++;
2707                         page_no++;
2708                 }
2709                 
2710                 cnt--;  // remove stopper key
2711                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
2712                 break;
2713         }
2714
2715         bt_close (bt);
2716 #ifdef unix
2717         return NULL;
2718 #else
2719         return 0;
2720 #endif
2721 }
2722
2723 typedef struct timeval timer;
2724
2725 int main (int argc, char **argv)
2726 {
2727 int idx, cnt, len, slot, err;
2728 int segsize, bits = 16;
2729 double start, stop;
2730 #ifdef unix
2731 pthread_t *threads;
2732 #else
2733 HANDLE *threads;
2734 #endif
2735 ThreadArg *args;
2736 uint poolsize = 0;
2737 float elapsed;
2738 int num = 0;
2739 char key[1];
2740 BtMgr *mgr;
2741 BtKey *ptr;
2742 BtDb *bt;
2743
2744         if( argc < 3 ) {
2745                 fprintf (stderr, "Usage: %s idx_file Read/Write/Scan/Delete/Find [page_bits buffer_pool_size line_numbers src_file1 src_file2 ... ]\n", argv[0]);
2746                 fprintf (stderr, "  where page_bits is the page size in bits\n");
2747                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
2748                 fprintf (stderr, "  line_numbers = 1 to append line numbers to keys\n");
2749                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
2750                 exit(0);
2751         }
2752
2753         start = getCpuTime(0);
2754
2755         if( argc > 3 )
2756                 bits = atoi(argv[3]);
2757
2758         if( argc > 4 )
2759                 poolsize = atoi(argv[4]);
2760
2761         if( !poolsize )
2762                 fprintf (stderr, "Warning: no mapped_pool\n");
2763
2764         if( argc > 5 )
2765                 num = atoi(argv[5]);
2766
2767         cnt = argc - 6;
2768 #ifdef unix
2769         threads = malloc (cnt * sizeof(pthread_t));
2770 #else
2771         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
2772 #endif
2773         args = malloc (cnt * sizeof(ThreadArg));
2774
2775         mgr = bt_mgr ((argv[1]), bits, poolsize);
2776
2777         if( !mgr ) {
2778                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
2779                 exit (1);
2780         }
2781
2782         //      fire off threads
2783
2784         for( idx = 0; idx < cnt; idx++ ) {
2785                 args[idx].infile = argv[idx + 6];
2786                 args[idx].type = argv[2];
2787                 args[idx].mgr = mgr;
2788                 args[idx].num = num;
2789                 args[idx].idx = idx;
2790 #ifdef unix
2791                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
2792                         fprintf(stderr, "Error creating thread %d\n", err);
2793 #else
2794                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
2795 #endif
2796         }
2797
2798         //      wait for termination
2799
2800 #ifdef unix
2801         for( idx = 0; idx < cnt; idx++ )
2802                 pthread_join (threads[idx], NULL);
2803 #else
2804         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
2805
2806         for( idx = 0; idx < cnt; idx++ )
2807                 CloseHandle(threads[idx]);
2808
2809 #endif
2810         bt_poolaudit(mgr);
2811         bt_mgrclose (mgr);
2812
2813         elapsed = getCpuTime(0) - start;
2814         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2815         elapsed = getCpuTime(1);
2816         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2817         elapsed = getCpuTime(2);
2818         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
2819 }
2820
2821 #endif  //STANDALONE