]> pd.if.org Git - btree/blob - threadskv8.c
6b377106ad1666078fbfa663a520df47ea0b9dc4
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 //      write only queue lock
116
117 typedef struct {
118         ushort ticket[1];
119         ushort serving[1];
120 } WOLock;
121
122 #define PHID 0x1
123 #define PRES 0x2
124 #define MASK 0x3
125 #define RINC 0x4
126
127 //      definition for spin latch implementation
128
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
132
133 volatile typedef struct {
134         ushort exclusive:1;
135         ushort pending:1;
136         ushort share:14;
137 } BtSpinLatch;
138
139 #define XCL 1
140 #define PEND 2
141 #define BOTH 3
142 #define SHARE 4
143
144 //  hash table entries
145
146 typedef struct {
147         volatile uint slot;             // Latch table entry at head of chain
148         BtSpinLatch latch[1];
149 } BtHashEntry;
150
151 //      latch manager table structure
152
153 typedef struct {
154         uid page_no;                    // latch set page number
155         RWLock readwr[1];               // read/write page lock
156         RWLock access[1];               // Access Intent/Page delete
157         WOLock parent[1];               // Posting of fence key in parent
158         WOLock atomic[1];               // Atomic update in progress
159         uint split;                             // right split page atomic insert
160         uint entry;                             // entry slot in latch table
161         uint next;                              // next entry in hash table chain
162         uint prev;                              // prev entry in hash table chain
163         volatile ushort pin;    // number of outstanding threads
164         ushort dirty:1;                 // page in cache is dirty
165 } BtLatchSet;
166
167 //      Define the length of the page record numbers
168
169 #define BtId 6
170
171 //      Page key slot definition.
172
173 //      Keys are marked dead, but remain on the page until
174 //      it cleanup is called. The fence key (highest key) for
175 //      a leaf page is always present, even after cleanup.
176
177 //      Slot types
178
179 //      In addition to the Unique keys that occupy slots
180 //      there are Librarian and Duplicate key
181 //      slots occupying the key slot array.
182
183 //      The Librarian slots are dead keys that
184 //      serve as filler, available to add new Unique
185 //      or Dup slots that are inserted into the B-tree.
186
187 //      The Duplicate slots have had their key bytes extended
188 //      by 6 bytes to contain a binary duplicate key uniqueifier.
189
190 typedef enum {
191         Unique,
192         Librarian,
193         Duplicate,
194         Delete,
195         Update
196 } BtSlotType;
197
198 typedef struct {
199         uint off:BT_maxbits;    // page offset for key start
200         uint type:3;                    // type of slot
201         uint dead:1;                    // set for deleted slot
202 } BtSlot;
203
204 //      The key structure occupies space at the upper end of
205 //      each page.  It's a length byte followed by the key
206 //      bytes.
207
208 typedef struct {
209         unsigned char len;              // this can be changed to a ushort or uint
210         unsigned char key[0];
211 } BtKey;
212
213 //      the value structure also occupies space at the upper
214 //      end of the page. Each key is immediately followed by a value.
215
216 typedef struct {
217         unsigned char len;              // this can be changed to a ushort or uint
218         unsigned char value[0];
219 } BtVal;
220
221 #define BT_maxkey       255             // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
223
224 //      The first part of an index page.
225 //      It is immediately followed
226 //      by the BtSlot array of keys.
227
228 //      note that this structure size
229 //      must be a multiple of 8 bytes
230 //      in order to place dups correctly.
231
232 typedef struct BtPage_ {
233         uint cnt;                                       // count of keys in page
234         uint act;                                       // count of active keys
235         uint min;                                       // next key offset
236         uint garbage;                           // page garbage in bytes
237         unsigned char bits:7;           // page size in bits
238         unsigned char free:1;           // page is on free chain
239         unsigned char lvl:7;            // level of page
240         unsigned char kill:1;           // page is being deleted
241         unsigned char right[BtId];      // page number to right
242         unsigned char left[BtId];       // page number to left
243         unsigned char filler[2];        // padding to multiple of 8
244 } *BtPage;
245
246 //  The loadpage interface object
247
248 typedef struct {
249         BtPage page;            // current page pointer
250         BtLatchSet *latch;      // current page latch set
251 } BtPageSet;
252
253 //      structure for latch manager on ALLOC_page
254
255 typedef struct {
256         struct BtPage_ alloc[1];        // next page_no in right ptr
257         unsigned long long dups[1];     // global duplicate key uniqueifier
258         unsigned char chain[BtId];      // head of free page_nos chain
259 } BtPageZero;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266 #ifdef unix
267         int idx;
268 #else
269         HANDLE idx;
270 #endif
271         BtPageZero *pagezero;           // mapped allocation page
272         BtSpinLatch lock[1];            // allocation area lite latch
273         uint latchdeployed;                     // highest number of latch entries deployed
274         uint nlatchpage;                        // number of latch pages at BT_latch
275         uint latchtotal;                        // number of page latch entries
276         uint latchhash;                         // number of latch hash table slots
277         uint latchvictim;                       // next latch entry to examine
278         ushort thread_no[1];            // next thread number
279         BtHashEntry *hashtable;         // the buffer pool hash table entries
280         BtLatchSet *latchsets;          // mapped latch set from buffer pool
281         unsigned char *pagepool;        // mapped to the buffer pool pages
282 #ifndef unix
283         HANDLE halloc;                          // allocation handle
284         HANDLE hpool;                           // buffer pool handle
285 #endif
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                             // buffer manager for thread
290         BtPage cursor;                  // cached frame for start/next (never mapped)
291         BtPage frame;                   // spare frame for the page split (never mapped)
292         uid cursor_page;                                // current cursor page number   
293         unsigned char *mem;                             // frame, cursor, page memory buffer
294         unsigned char key[BT_keyarray]; // last found complete key
295         int found;                              // last delete or insert was found
296         int err;                                // last error
297         int reads, writes;              // number of reads and writes from the btree
298         ushort thread_no;               // thread number
299 } BtDb;
300
301 typedef enum {
302         BTERR_ok = 0,
303         BTERR_struct,
304         BTERR_ovflw,
305         BTERR_lock,
306         BTERR_map,
307         BTERR_read,
308         BTERR_wrt,
309         BTERR_atomic
310 } BTERR;
311
312 #define CLOCK_bit 0x8000
313
314 // B-Tree functions
315 extern void bt_close (BtDb *bt);
316 extern BtDb *bt_open (BtMgr *mgr);
317 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
318 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
319 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
320 extern BtKey *bt_foundkey (BtDb *bt);
321 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
322 extern uint bt_nextkey   (BtDb *bt, uint slot);
323
324 //      manager functions
325 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
326 void bt_mgrclose (BtMgr *mgr);
327
328 //  Helper functions to return slot values
329 //      from the cursor page.
330
331 extern BtKey *bt_key (BtDb *bt, uint slot);
332 extern BtVal *bt_val (BtDb *bt, uint slot);
333
334 //  The page is allocated from low and hi ends.
335 //  The key slots are allocated from the bottom,
336 //      while the text and value of the key
337 //  are allocated from the top.  When the two
338 //  areas meet, the page is split into two.
339
340 //  A key consists of a length byte, two bytes of
341 //  index number (0 - 65535), and up to 253 bytes
342 //  of key value.
343
344 //  Associated with each key is a value byte string
345 //      containing any value desired.
346
347 //  The b-tree root is always located at page 1.
348 //      The first leaf page of level zero is always
349 //      located on page 2.
350
351 //      The b-tree pages are linked with next
352 //      pointers to facilitate enumerators,
353 //      and provide for concurrency.
354
355 //      When to root page fills, it is split in two and
356 //      the tree height is raised by a new root at page
357 //      one with two keys.
358
359 //      Deleted keys are marked with a dead bit until
360 //      page cleanup. The fence key for a leaf node is
361 //      always present
362
363 //  To achieve maximum concurrency one page is locked at a time
364 //  as the tree is traversed to find leaf key in question. The right
365 //  page numbers are used in cases where the page is being split,
366 //      or consolidated.
367
368 //  Page 0 is dedicated to lock for new page extensions,
369 //      and chains empty pages together for reuse. It also
370 //      contains the latch manager hash table.
371
372 //      The ParentModification lock on a node is obtained to serialize posting
373 //      or changing the fence key for a node.
374
375 //      Empty pages are chained together through the ALLOC page and reused.
376
377 //      Access macros to address slot and key values from the page
378 //      Page slots use 1 based indexing.
379
380 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
381 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
382 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
383
384 void bt_putid(unsigned char *dest, uid id)
385 {
386 int i = BtId;
387
388         while( i-- )
389                 dest[i] = (unsigned char)id, id >>= 8;
390 }
391
392 uid bt_getid(unsigned char *src)
393 {
394 uid id = 0;
395 int i;
396
397         for( i = 0; i < BtId; i++ )
398                 id <<= 8, id |= *src++; 
399
400         return id;
401 }
402
403 uid bt_newdup (BtDb *bt)
404 {
405 #ifdef unix
406         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
407 #else
408         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
409 #endif
410 }
411
412 //      Write-Only Queue Lock
413
414 void WriteOLock (WOLock *lock)
415 {
416 ushort tix;
417 #ifdef unix
418         tix = __sync_fetch_and_add (lock->ticket, 1);
419 #else
420         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
421 #endif
422         // wait for our ticket to come up
423
424         while( tix != lock->serving[0] )
425 #ifdef unix
426                 sched_yield();
427 #else
428                 SwitchToThread ();
429 #endif
430 }
431
432 void WriteORelease (WOLock *lock)
433 {
434         lock->serving[0]++;
435 }
436
437 //      Phase-Fair reader/writer lock implementation
438
439 void WriteLock (RWLock *lock)
440 {
441 ushort w, r, tix;
442
443 #ifdef unix
444         tix = __sync_fetch_and_add (lock->ticket, 1);
445 #else
446         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
447 #endif
448         // wait for our ticket to come up
449
450         while( tix != lock->serving[0] )
451 #ifdef unix
452                 sched_yield();
453 #else
454                 SwitchToThread ();
455 #endif
456
457         w = PRES | (tix & PHID);
458 #ifdef  unix
459         r = __sync_fetch_and_add (lock->rin, w);
460 #else
461         r = _InterlockedExchangeAdd16 (lock->rin, w);
462 #endif
463         while( r != *lock->rout )
464 #ifdef unix
465                 sched_yield();
466 #else
467                 SwitchToThread();
468 #endif
469 }
470
471 void WriteRelease (RWLock *lock)
472 {
473 #ifdef unix
474         __sync_fetch_and_and (lock->rin, ~MASK);
475 #else
476         _InterlockedAnd16 (lock->rin, ~MASK);
477 #endif
478         lock->serving[0]++;
479 }
480
481 void ReadLock (RWLock *lock)
482 {
483 ushort w;
484 #ifdef unix
485         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
486 #else
487         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
488 #endif
489         if( w )
490           while( w == (*lock->rin & MASK) )
491 #ifdef unix
492                 sched_yield ();
493 #else
494                 SwitchToThread ();
495 #endif
496 }
497
498 void ReadRelease (RWLock *lock)
499 {
500 #ifdef unix
501         __sync_fetch_and_add (lock->rout, RINC);
502 #else
503         _InterlockedExchangeAdd16 (lock->rout, RINC);
504 #endif
505 }
506
507 //      Spin Latch Manager
508
509 //      wait until write lock mode is clear
510 //      and add 1 to the share count
511
512 void bt_spinreadlock(BtSpinLatch *latch)
513 {
514 ushort prev;
515
516   do {
517 #ifdef unix
518         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
519 #else
520         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
521 #endif
522         //  see if exclusive request is granted or pending
523
524         if( !(prev & BOTH) )
525                 return;
526 #ifdef unix
527         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
528 #else
529         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
530 #endif
531 #ifdef  unix
532   } while( sched_yield(), 1 );
533 #else
534   } while( SwitchToThread(), 1 );
535 #endif
536 }
537
538 //      wait for other read and write latches to relinquish
539
540 void bt_spinwritelock(BtSpinLatch *latch)
541 {
542 ushort prev;
543
544   do {
545 #ifdef  unix
546         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
547 #else
548         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
549 #endif
550         if( !(prev & XCL) )
551           if( !(prev & ~BOTH) )
552                 return;
553           else
554 #ifdef unix
555                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
556 #else
557                 _InterlockedAnd16((ushort *)latch, ~XCL);
558 #endif
559 #ifdef  unix
560   } while( sched_yield(), 1 );
561 #else
562   } while( SwitchToThread(), 1 );
563 #endif
564 }
565
566 //      try to obtain write lock
567
568 //      return 1 if obtained,
569 //              0 otherwise
570
571 int bt_spinwritetry(BtSpinLatch *latch)
572 {
573 ushort prev;
574
575 #ifdef  unix
576         prev = __sync_fetch_and_or((ushort *)latch, XCL);
577 #else
578         prev = _InterlockedOr16((ushort *)latch, XCL);
579 #endif
580         //      take write access if all bits are clear
581
582         if( !(prev & XCL) )
583           if( !(prev & ~BOTH) )
584                 return 1;
585           else
586 #ifdef unix
587                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
588 #else
589                 _InterlockedAnd16((ushort *)latch, ~XCL);
590 #endif
591         return 0;
592 }
593
594 //      clear write mode
595
596 void bt_spinreleasewrite(BtSpinLatch *latch)
597 {
598 #ifdef unix
599         __sync_fetch_and_and((ushort *)latch, ~BOTH);
600 #else
601         _InterlockedAnd16((ushort *)latch, ~BOTH);
602 #endif
603 }
604
605 //      decrement reader count
606
607 void bt_spinreleaseread(BtSpinLatch *latch)
608 {
609 #ifdef unix
610         __sync_fetch_and_add((ushort *)latch, -SHARE);
611 #else
612         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
613 #endif
614 }
615
616 //      read page from permanent location in Btree file
617
618 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
619 {
620 off64_t off = page_no << mgr->page_bits;
621
622 #ifdef unix
623         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
624                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
625                 return BTERR_read;
626         }
627 #else
628 OVERLAPPED ovl[1];
629 uint amt[1];
630
631         memset (ovl, 0, sizeof(OVERLAPPED));
632         ovl->Offset = off;
633         ovl->OffsetHigh = off >> 32;
634
635         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
636                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
637                 return BTERR_read;
638         }
639         if( *amt <  mgr->page_size ) {
640                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
641                 return BTERR_read;
642         }
643 #endif
644         return 0;
645 }
646
647 //      write page to permanent location in Btree file
648 //      clear the dirty bit
649
650 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
651 {
652 off64_t off = page_no << mgr->page_bits;
653
654 #ifdef unix
655         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
656                 return BTERR_wrt;
657 #else
658 OVERLAPPED ovl[1];
659 uint amt[1];
660
661         memset (ovl, 0, sizeof(OVERLAPPED));
662         ovl->Offset = off;
663         ovl->OffsetHigh = off >> 32;
664
665         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
666                 return BTERR_wrt;
667
668         if( *amt <  mgr->page_size )
669                 return BTERR_wrt;
670 #endif
671         return 0;
672 }
673
674 //      link latch table entry into head of latch hash table
675
676 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
677 {
678 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
679 BtLatchSet *latch = bt->mgr->latchsets + slot;
680
681         if( latch->next = bt->mgr->hashtable[hashidx].slot )
682                 bt->mgr->latchsets[latch->next].prev = slot;
683
684         bt->mgr->hashtable[hashidx].slot = slot;
685         latch->page_no = page_no;
686         latch->entry = slot;
687         latch->split = 0;
688         latch->prev = 0;
689         latch->pin = 1;
690
691         if( loadit )
692           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
693                 return bt->err;
694           else
695                 bt->reads++;
696
697         return bt->err = 0;
698 }
699
700 //      set CLOCK bit in latch
701 //      decrement pin count
702
703 void bt_unpinlatch (BtLatchSet *latch)
704 {
705 #ifdef unix
706         if( ~latch->pin & CLOCK_bit )
707                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
708         __sync_fetch_and_add(&latch->pin, -1);
709 #else
710         if( ~latch->pin & CLOCK_bit )
711                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
712         _InterlockedDecrement16 (&latch->pin);
713 #endif
714 }
715
716 //  return the btree cached page address
717
718 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
719 {
720 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
721
722         return page;
723 }
724
725 //      find existing latchset or inspire new one
726 //      return with latchset pinned
727
728 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
729 {
730 uint hashidx = page_no % bt->mgr->latchhash;
731 BtLatchSet *latch;
732 uint slot, idx;
733 uint lvl, cnt;
734 off64_t off;
735 uint amt[1];
736 BtPage page;
737
738   //  try to find our entry
739
740   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
741
742   if( slot = bt->mgr->hashtable[hashidx].slot ) do
743   {
744         latch = bt->mgr->latchsets + slot;
745         if( page_no == latch->page_no )
746                 break;
747   } while( slot = latch->next );
748
749   //  found our entry
750   //  increment clock
751
752   if( slot ) {
753         latch = bt->mgr->latchsets + slot;
754 #ifdef unix
755         __sync_fetch_and_add(&latch->pin, 1);
756 #else
757         _InterlockedIncrement16 (&latch->pin);
758 #endif
759         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
760         return latch;
761   }
762
763         //  see if there are any unused pool entries
764 #ifdef unix
765         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
766 #else
767         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
768 #endif
769
770         if( slot < bt->mgr->latchtotal ) {
771                 latch = bt->mgr->latchsets + slot;
772                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
773                         return NULL;
774                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
775                 return latch;
776         }
777
778 #ifdef unix
779         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
780 #else
781         _InterlockedDecrement (&bt->mgr->latchdeployed);
782 #endif
783   //  find and reuse previous entry on victim
784
785   while( 1 ) {
786 #ifdef unix
787         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
788 #else
789         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
790 #endif
791         // try to get write lock on hash chain
792         //      skip entry if not obtained
793         //      or has outstanding pins
794
795         slot %= bt->mgr->latchtotal;
796
797         if( !slot )
798                 continue;
799
800         latch = bt->mgr->latchsets + slot;
801         idx = latch->page_no % bt->mgr->latchhash;
802
803         //      see we are on same chain as hashidx
804
805         if( idx == hashidx )
806                 continue;
807
808         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
809                 continue;
810
811         //  skip this slot if it is pinned
812         //      or the CLOCK bit is set
813
814         if( latch->pin ) {
815           if( latch->pin & CLOCK_bit ) {
816 #ifdef unix
817                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
818 #else
819                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
820 #endif
821           }
822           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
823           continue;
824         }
825
826         //  update permanent page area in btree from buffer pool
827
828         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
829
830         if( latch->dirty )
831           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
832                 return NULL;
833           else
834                 latch->dirty = 0, bt->writes++;
835
836         //  unlink our available slot from its hash chain
837
838         if( latch->prev )
839                 bt->mgr->latchsets[latch->prev].next = latch->next;
840         else
841                 bt->mgr->hashtable[idx].slot = latch->next;
842
843         if( latch->next )
844                 bt->mgr->latchsets[latch->next].prev = latch->prev;
845
846         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
847
848         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
849                 return NULL;
850
851         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
852         return latch;
853   }
854 }
855
856 void bt_mgrclose (BtMgr *mgr)
857 {
858 BtLatchSet *latch;
859 uint num = 0;
860 BtPage page;
861 uint slot;
862
863         //      flush dirty pool pages to the btree
864
865         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
866                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
867                 latch = mgr->latchsets + slot;
868
869                 if( latch->dirty ) {
870                         bt_writepage(mgr, page, latch->page_no);
871                         latch->dirty = 0, num++;
872                 }
873 //              madvise (page, mgr->page_size, MADV_DONTNEED);
874         }
875
876         fprintf(stderr, "%d buffer pool pages flushed\n", num);
877
878 #ifdef unix
879         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
880         munmap (mgr->pagezero, mgr->page_size);
881 #else
882         FlushViewOfFile(mgr->pagezero, 0);
883         UnmapViewOfFile(mgr->pagezero);
884         UnmapViewOfFile(mgr->hashtable);
885         CloseHandle(mgr->halloc);
886         CloseHandle(mgr->hpool);
887 #endif
888 #ifdef unix
889         close (mgr->idx);
890         free (mgr);
891 #else
892         FlushFileBuffers(mgr->idx);
893         CloseHandle(mgr->idx);
894         GlobalFree (mgr);
895 #endif
896 }
897
898 //      close and release memory
899
900 void bt_close (BtDb *bt)
901 {
902 #ifdef unix
903         if( bt->mem )
904                 free (bt->mem);
905 #else
906         if( bt->mem)
907                 VirtualFree (bt->mem, 0, MEM_RELEASE);
908 #endif
909         free (bt);
910 }
911
912 //  open/create new btree buffer manager
913
914 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
915 //              size of page pool (e.g. 262144)
916
917 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
918 {
919 uint lvl, attr, last, slot, idx;
920 uint nlatchpage, latchhash;
921 unsigned char value[BtId];
922 int flag, initit = 0;
923 BtPageZero *pagezero;
924 off64_t size;
925 uint amt[1];
926 BtMgr* mgr;
927 BtKey* key;
928 BtVal *val;
929
930         // determine sanity of page size and buffer pool
931
932         if( bits > BT_maxbits )
933                 bits = BT_maxbits;
934         else if( bits < BT_minbits )
935                 bits = BT_minbits;
936
937         if( nodemax < 16 ) {
938                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
939                 return NULL;
940         }
941
942 #ifdef unix
943         mgr = calloc (1, sizeof(BtMgr));
944
945         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
946
947         if( mgr->idx == -1 ) {
948                 fprintf (stderr, "Unable to open btree file\n");
949                 return free(mgr), NULL;
950         }
951 #else
952         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
953         attr = FILE_ATTRIBUTE_NORMAL;
954         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
955
956         if( mgr->idx == INVALID_HANDLE_VALUE )
957                 return GlobalFree(mgr), NULL;
958 #endif
959
960 #ifdef unix
961         pagezero = valloc (BT_maxpage);
962         *amt = 0;
963
964         // read minimum page size to get root info
965         //      to support raw disk partition files
966         //      check if bits == 0 on the disk.
967
968         if( size = lseek (mgr->idx, 0L, 2) )
969                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
970                         if( pagezero->alloc->bits )
971                                 bits = pagezero->alloc->bits;
972                         else
973                                 initit = 1;
974                 else
975                         return free(mgr), free(pagezero), NULL;
976         else
977                 initit = 1;
978 #else
979         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
980         size = GetFileSize(mgr->idx, amt);
981
982         if( size || *amt ) {
983                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
984                         return bt_mgrclose (mgr), NULL;
985                 bits = pagezero->alloc->bits;
986         } else
987                 initit = 1;
988 #endif
989
990         mgr->page_size = 1 << bits;
991         mgr->page_bits = bits;
992
993         //  calculate number of latch hash table entries
994
995         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
996         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
997
998         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
999         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1000         mgr->latchtotal = nodemax;
1001
1002         if( !initit )
1003                 goto mgrlatch;
1004
1005         // initialize an empty b-tree with latch page, root page, page of leaves
1006         // and page(s) of latches and page pool cache
1007
1008         memset (pagezero, 0, 1 << bits);
1009         pagezero->alloc->bits = mgr->page_bits;
1010         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1011
1012         //  initialize left-most LEAF page in
1013         //      alloc->left.
1014
1015         bt_putid (pagezero->alloc->left, LEAF_page);
1016
1017         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1018                 fprintf (stderr, "Unable to create btree page zero\n");
1019                 return bt_mgrclose (mgr), NULL;
1020         }
1021
1022         memset (pagezero, 0, 1 << bits);
1023         pagezero->alloc->bits = mgr->page_bits;
1024
1025         for( lvl=MIN_lvl; lvl--; ) {
1026                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1027                 key = keyptr(pagezero->alloc, 1);
1028                 key->len = 2;           // create stopper key
1029                 key->key[0] = 0xff;
1030                 key->key[1] = 0xff;
1031
1032                 bt_putid(value, MIN_lvl - lvl + 1);
1033                 val = valptr(pagezero->alloc, 1);
1034                 val->len = lvl ? BtId : 0;
1035                 memcpy (val->value, value, val->len);
1036
1037                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1038                 pagezero->alloc->lvl = lvl;
1039                 pagezero->alloc->cnt = 1;
1040                 pagezero->alloc->act = 1;
1041
1042                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1043                         fprintf (stderr, "Unable to create btree page zero\n");
1044                         return bt_mgrclose (mgr), NULL;
1045                 }
1046         }
1047
1048 mgrlatch:
1049 #ifdef unix
1050         free (pagezero);
1051 #else
1052         VirtualFree (pagezero, 0, MEM_RELEASE);
1053 #endif
1054 #ifdef unix
1055         // mlock the pagezero page
1056
1057         flag = PROT_READ | PROT_WRITE;
1058         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1059         if( mgr->pagezero == MAP_FAILED ) {
1060                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1061                 return bt_mgrclose (mgr), NULL;
1062         }
1063         mlock (mgr->pagezero, mgr->page_size);
1064
1065         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1066         if( mgr->hashtable == MAP_FAILED ) {
1067                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1068                 return bt_mgrclose (mgr), NULL;
1069         }
1070 #else
1071         flag = PAGE_READWRITE;
1072         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1073         if( !mgr->halloc ) {
1074                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1075                 return bt_mgrclose (mgr), NULL;
1076         }
1077
1078         flag = FILE_MAP_WRITE;
1079         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1080         if( !mgr->pagezero ) {
1081                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1082                 return bt_mgrclose (mgr), NULL;
1083         }
1084
1085         flag = PAGE_READWRITE;
1086         size = (uid)mgr->nlatchpage << mgr->page_bits;
1087         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1088         if( !mgr->hpool ) {
1089                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1090                 return bt_mgrclose (mgr), NULL;
1091         }
1092
1093         flag = FILE_MAP_WRITE;
1094         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1095         if( !mgr->hashtable ) {
1096                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1097                 return bt_mgrclose (mgr), NULL;
1098         }
1099 #endif
1100
1101         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1102         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1103
1104         return mgr;
1105 }
1106
1107 //      open BTree access method
1108 //      based on buffer manager
1109
1110 BtDb *bt_open (BtMgr *mgr)
1111 {
1112 BtDb *bt = malloc (sizeof(*bt));
1113
1114         memset (bt, 0, sizeof(*bt));
1115         bt->mgr = mgr;
1116 #ifdef unix
1117         bt->mem = valloc (2 *mgr->page_size);
1118 #else
1119         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1120 #endif
1121         bt->frame = (BtPage)bt->mem;
1122         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1123 #ifdef unix
1124         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1125 #else
1126         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1127 #endif
1128         return bt;
1129 }
1130
1131 //  compare two keys, return > 0, = 0, or < 0
1132 //  =0: keys are same
1133 //  -1: key2 > key1
1134 //  +1: key2 < key1
1135 //  as the comparison value
1136
1137 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1138 {
1139 uint len1 = key1->len;
1140 int ans;
1141
1142         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1143                 return ans;
1144
1145         if( len1 > len2 )
1146                 return 1;
1147         if( len1 < len2 )
1148                 return -1;
1149
1150         return 0;
1151 }
1152
1153 // place write, read, or parent lock on requested page_no.
1154
1155 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1156 {
1157         switch( mode ) {
1158         case BtLockRead:
1159                 ReadLock (latch->readwr);
1160                 break;
1161         case BtLockWrite:
1162                 WriteLock (latch->readwr);
1163                 break;
1164         case BtLockAccess:
1165                 ReadLock (latch->access);
1166                 break;
1167         case BtLockDelete:
1168                 WriteLock (latch->access);
1169                 break;
1170         case BtLockParent:
1171                 WriteOLock (latch->parent);
1172                 break;
1173         case BtLockAtomic:
1174                 WriteOLock (latch->atomic);
1175                 break;
1176         }
1177 }
1178
1179 // remove write, read, or parent lock on requested page
1180
1181 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1182 {
1183         switch( mode ) {
1184         case BtLockRead:
1185                 ReadRelease (latch->readwr);
1186                 break;
1187         case BtLockWrite:
1188                 WriteRelease (latch->readwr);
1189                 break;
1190         case BtLockAccess:
1191                 ReadRelease (latch->access);
1192                 break;
1193         case BtLockDelete:
1194                 WriteRelease (latch->access);
1195                 break;
1196         case BtLockParent:
1197                 WriteORelease (latch->parent);
1198                 break;
1199         case BtLockAtomic:
1200                 WriteORelease (latch->atomic);
1201                 break;
1202         }
1203 }
1204
1205 //      allocate a new page
1206 //      return with page latched, but unlocked.
1207
1208 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1209 {
1210 uid page_no;
1211 int blk;
1212
1213         //      lock allocation page
1214
1215         bt_spinwritelock(bt->mgr->lock);
1216
1217         // use empty chain first
1218         // else allocate empty page
1219
1220         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1221                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1222                         set->page = bt_mappage (bt, set->latch);
1223                 else
1224                         return bt->err = BTERR_struct, -1;
1225
1226                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1227                 bt_spinreleasewrite(bt->mgr->lock);
1228
1229                 memcpy (set->page, contents, bt->mgr->page_size);
1230                 set->latch->dirty = 1;
1231                 return 0;
1232         }
1233
1234         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1235         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1236
1237         // unlock allocation latch
1238
1239         bt_spinreleasewrite(bt->mgr->lock);
1240
1241         //      don't load cache from btree page
1242
1243         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1244                 set->page = bt_mappage (bt, set->latch);
1245         else
1246                 return bt->err = BTERR_struct;
1247
1248         memcpy (set->page, contents, bt->mgr->page_size);
1249         set->latch->dirty = 1;
1250         return 0;
1251 }
1252
1253 //  find slot in page for given key at a given level
1254
1255 int bt_findslot (BtPage page, unsigned char *key, uint len)
1256 {
1257 uint diff, higher = page->cnt, low = 1, slot;
1258 uint good = 0;
1259
1260         //        make stopper key an infinite fence value
1261
1262         if( bt_getid (page->right) )
1263                 higher++;
1264         else
1265                 good++;
1266
1267         //      low is the lowest candidate.
1268         //  loop ends when they meet
1269
1270         //  higher is already
1271         //      tested as .ge. the passed key.
1272
1273         while( diff = higher - low ) {
1274                 slot = low + ( diff >> 1 );
1275                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1276                         low = slot + 1;
1277                 else
1278                         higher = slot, good++;
1279         }
1280
1281         //      return zero if key is on right link page
1282
1283         return good ? higher : 0;
1284 }
1285
1286 //  find and load page at given level for given key
1287 //      leave page rd or wr locked as requested
1288
1289 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1290 {
1291 uid page_no = ROOT_page, prevpage = 0;
1292 uint drill = 0xff, slot;
1293 BtLatchSet *prevlatch;
1294 uint mode, prevmode;
1295
1296   //  start at root of btree and drill down
1297
1298   do {
1299         // determine lock mode of drill level
1300         mode = (drill == lvl) ? lock : BtLockRead; 
1301
1302         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1303           return 0;
1304
1305         // obtain access lock using lock chaining with Access mode
1306
1307         if( page_no > ROOT_page )
1308           bt_lockpage(bt, BtLockAccess, set->latch);
1309
1310         set->page = bt_mappage (bt, set->latch);
1311
1312         //      release & unpin parent or left sibling page
1313
1314         if( prevpage ) {
1315           bt_unlockpage(bt, prevmode, prevlatch);
1316           bt_unpinlatch (prevlatch);
1317           prevpage = 0;
1318         }
1319
1320         // obtain mode lock using lock chaining through AccessLock
1321
1322         bt_lockpage(bt, mode, set->latch);
1323
1324         if( set->page->free )
1325                 return bt->err = BTERR_struct, 0;
1326
1327         if( page_no > ROOT_page )
1328           bt_unlockpage(bt, BtLockAccess, set->latch);
1329
1330         // re-read and re-lock root after determining actual level of root
1331
1332         if( set->page->lvl != drill) {
1333                 if( set->latch->page_no != ROOT_page )
1334                         return bt->err = BTERR_struct, 0;
1335                         
1336                 drill = set->page->lvl;
1337
1338                 if( lock != BtLockRead && drill == lvl ) {
1339                   bt_unlockpage(bt, mode, set->latch);
1340                   bt_unpinlatch (set->latch);
1341                   continue;
1342                 }
1343         }
1344
1345         prevpage = set->latch->page_no;
1346         prevlatch = set->latch;
1347         prevmode = mode;
1348
1349         //  find key on page at this level
1350         //  and descend to requested level
1351
1352         if( !set->page->kill )
1353          if( slot = bt_findslot (set->page, key, len) ) {
1354           if( drill == lvl )
1355                 return slot;
1356
1357           // find next non-dead slot -- the fence key if nothing else
1358
1359           while( slotptr(set->page, slot)->dead )
1360                 if( slot++ < set->page->cnt )
1361                   continue;
1362                 else
1363                   return bt->err = BTERR_struct, 0;
1364
1365           page_no = bt_getid(valptr(set->page, slot)->value);
1366           drill--;
1367           continue;
1368          }
1369
1370         //  or slide right into next page
1371
1372         page_no = bt_getid(set->page->right);
1373   } while( page_no );
1374
1375   // return error on end of right chain
1376
1377   bt->err = BTERR_struct;
1378   return 0;     // return error
1379 }
1380
1381 //      return page to free list
1382 //      page must be delete & write locked
1383
1384 void bt_freepage (BtDb *bt, BtPageSet *set)
1385 {
1386         //      lock allocation page
1387
1388         bt_spinwritelock (bt->mgr->lock);
1389
1390         //      store chain
1391
1392         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1393         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1394         set->latch->dirty = 1;
1395         set->page->free = 1;
1396
1397         // unlock released page
1398
1399         bt_unlockpage (bt, BtLockDelete, set->latch);
1400         bt_unlockpage (bt, BtLockWrite, set->latch);
1401         bt_unpinlatch (set->latch);
1402
1403         // unlock allocation page
1404
1405         bt_spinreleasewrite (bt->mgr->lock);
1406 }
1407
1408 //      a fence key was deleted from a page
1409 //      push new fence value upwards
1410
1411 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1412 {
1413 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1414 unsigned char value[BtId];
1415 BtKey* ptr;
1416 uint idx;
1417
1418         //      remove the old fence value
1419
1420         ptr = keyptr(set->page, set->page->cnt);
1421         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1422         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1423         set->latch->dirty = 1;
1424
1425         //  cache new fence value
1426
1427         ptr = keyptr(set->page, set->page->cnt);
1428         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1429
1430         bt_lockpage (bt, BtLockParent, set->latch);
1431         bt_unlockpage (bt, BtLockWrite, set->latch);
1432
1433         //      insert new (now smaller) fence key
1434
1435         bt_putid (value, set->latch->page_no);
1436         ptr = (BtKey*)leftkey;
1437
1438         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1439           return bt->err;
1440
1441         //      now delete old fence key
1442
1443         ptr = (BtKey*)rightkey;
1444
1445         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1446                 return bt->err;
1447
1448         bt_unlockpage (bt, BtLockParent, set->latch);
1449         bt_unpinlatch(set->latch);
1450         return 0;
1451 }
1452
1453 //      root has a single child
1454 //      collapse a level from the tree
1455
1456 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1457 {
1458 BtPageSet child[1];
1459 uid page_no;
1460 uint idx;
1461
1462   // find the child entry and promote as new root contents
1463
1464   do {
1465         for( idx = 0; idx++ < root->page->cnt; )
1466           if( !slotptr(root->page, idx)->dead )
1467                 break;
1468
1469         page_no = bt_getid (valptr(root->page, idx)->value);
1470
1471         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1472                 child->page = bt_mappage (bt, child->latch);
1473         else
1474                 return bt->err;
1475
1476         bt_lockpage (bt, BtLockDelete, child->latch);
1477         bt_lockpage (bt, BtLockWrite, child->latch);
1478
1479         memcpy (root->page, child->page, bt->mgr->page_size);
1480         root->latch->dirty = 1;
1481
1482         bt_freepage (bt, child);
1483
1484   } while( root->page->lvl > 1 && root->page->act == 1 );
1485
1486   bt_unlockpage (bt, BtLockWrite, root->latch);
1487   bt_unpinlatch (root->latch);
1488   return 0;
1489 }
1490
1491 //  delete a page and manage keys
1492 //  call with page writelocked
1493 //      returns with page unpinned
1494
1495 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1496 {
1497 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1498 unsigned char value[BtId];
1499 uint lvl = set->page->lvl;
1500 BtPageSet right[1];
1501 uid page_no;
1502 BtKey *ptr;
1503
1504         //      cache copy of fence key
1505         //      to post in parent
1506
1507         ptr = keyptr(set->page, set->page->cnt);
1508         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1509
1510         //      obtain lock on right page
1511
1512         page_no = bt_getid(set->page->right);
1513
1514         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1515                 right->page = bt_mappage (bt, right->latch);
1516         else
1517                 return 0;
1518
1519         bt_lockpage (bt, BtLockWrite, right->latch);
1520
1521         // cache copy of key to update
1522
1523         ptr = keyptr(right->page, right->page->cnt);
1524         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1525
1526         if( right->page->kill )
1527                 return bt->err = BTERR_struct;
1528
1529         // pull contents of right peer into our empty page
1530
1531         memcpy (set->page, right->page, bt->mgr->page_size);
1532         set->latch->dirty = 1;
1533
1534         // mark right page deleted and point it to left page
1535         //      until we can post parent updates that remove access
1536         //      to the deleted page.
1537
1538         bt_putid (right->page->right, set->latch->page_no);
1539         right->latch->dirty = 1;
1540         right->page->kill = 1;
1541
1542         bt_lockpage (bt, BtLockParent, right->latch);
1543         bt_unlockpage (bt, BtLockWrite, right->latch);
1544
1545         bt_lockpage (bt, BtLockParent, set->latch);
1546         bt_unlockpage (bt, BtLockWrite, set->latch);
1547
1548         // redirect higher key directly to our new node contents
1549
1550         bt_putid (value, set->latch->page_no);
1551         ptr = (BtKey*)higherfence;
1552
1553         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1554           return bt->err;
1555
1556         //      delete old lower key to our node
1557
1558         ptr = (BtKey*)lowerfence;
1559
1560         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1561           return bt->err;
1562
1563         //      obtain delete and write locks to right node
1564
1565         bt_unlockpage (bt, BtLockParent, right->latch);
1566         bt_lockpage (bt, BtLockDelete, right->latch);
1567         bt_lockpage (bt, BtLockWrite, right->latch);
1568         bt_freepage (bt, right);
1569
1570         bt_unlockpage (bt, BtLockParent, set->latch);
1571         bt_unpinlatch (set->latch);
1572         bt->found = 1;
1573         return 0;
1574 }
1575
1576 //  find and delete key on page by marking delete flag bit
1577 //  if page becomes empty, delete it from the btree
1578
1579 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1580 {
1581 uint slot, idx, found, fence;
1582 BtPageSet set[1];
1583 BtKey *ptr;
1584 BtVal *val;
1585
1586         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1587                 ptr = keyptr(set->page, slot);
1588         else
1589                 return bt->err;
1590
1591         // if librarian slot, advance to real slot
1592
1593         if( slotptr(set->page, slot)->type == Librarian )
1594                 ptr = keyptr(set->page, ++slot);
1595
1596         fence = slot == set->page->cnt;
1597
1598         // if key is found delete it, otherwise ignore request
1599
1600         if( found = !keycmp (ptr, key, len) )
1601           if( found = slotptr(set->page, slot)->dead == 0 ) {
1602                 val = valptr(set->page,slot);
1603                 slotptr(set->page, slot)->dead = 1;
1604                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1605                 set->page->act--;
1606
1607                 // collapse empty slots beneath the fence
1608
1609                 while( idx = set->page->cnt - 1 )
1610                   if( slotptr(set->page, idx)->dead ) {
1611                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1612                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1613                   } else
1614                         break;
1615           }
1616
1617         //      did we delete a fence key in an upper level?
1618
1619         if( found && lvl && set->page->act && fence )
1620           if( bt_fixfence (bt, set, lvl) )
1621                 return bt->err;
1622           else
1623                 return bt->found = found, 0;
1624
1625         //      do we need to collapse root?
1626
1627         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1628           if( bt_collapseroot (bt, set) )
1629                 return bt->err;
1630           else
1631                 return bt->found = found, 0;
1632
1633         //      delete empty page
1634
1635         if( !set->page->act )
1636                 return bt_deletepage (bt, set);
1637
1638         set->latch->dirty = 1;
1639         bt_unlockpage(bt, BtLockWrite, set->latch);
1640         bt_unpinlatch (set->latch);
1641         return bt->found = found, 0;
1642 }
1643
1644 BtKey *bt_foundkey (BtDb *bt)
1645 {
1646         return (BtKey*)(bt->key);
1647 }
1648
1649 //      advance to next slot
1650
1651 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1652 {
1653 BtLatchSet *prevlatch;
1654 uid page_no;
1655
1656         if( slot < set->page->cnt )
1657                 return slot + 1;
1658
1659         prevlatch = set->latch;
1660
1661         if( page_no = bt_getid(set->page->right) )
1662           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1663                 set->page = bt_mappage (bt, set->latch);
1664           else
1665                 return 0;
1666         else
1667           return bt->err = BTERR_struct, 0;
1668
1669         // obtain access lock using lock chaining with Access mode
1670
1671         bt_lockpage(bt, BtLockAccess, set->latch);
1672
1673         bt_unlockpage(bt, BtLockRead, prevlatch);
1674         bt_unpinlatch (prevlatch);
1675
1676         bt_lockpage(bt, BtLockRead, set->latch);
1677         bt_unlockpage(bt, BtLockAccess, set->latch);
1678         return 1;
1679 }
1680
1681 //      find unique key or first duplicate key in
1682 //      leaf level and return number of value bytes
1683 //      or (-1) if not found.  Setup key for bt_foundkey
1684
1685 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1686 {
1687 BtPageSet set[1];
1688 uint len, slot;
1689 int ret = -1;
1690 BtKey *ptr;
1691 BtVal *val;
1692
1693   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1694    do {
1695         ptr = keyptr(set->page, slot);
1696
1697         //      skip librarian slot place holder
1698
1699         if( slotptr(set->page, slot)->type == Librarian )
1700                 ptr = keyptr(set->page, ++slot);
1701
1702         //      return actual key found
1703
1704         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1705         len = ptr->len;
1706
1707         if( slotptr(set->page, slot)->type == Duplicate )
1708                 len -= BtId;
1709
1710         //      not there if we reach the stopper key
1711
1712         if( slot == set->page->cnt )
1713           if( !bt_getid (set->page->right) )
1714                 break;
1715
1716         // if key exists, return >= 0 value bytes copied
1717         //      otherwise return (-1)
1718
1719         if( slotptr(set->page, slot)->dead )
1720                 continue;
1721
1722         if( keylen == len )
1723           if( !memcmp (ptr->key, key, len) ) {
1724                 val = valptr (set->page,slot);
1725                 if( valmax > val->len )
1726                         valmax = val->len;
1727                 memcpy (value, val->value, valmax);
1728                 ret = valmax;
1729           }
1730
1731         break;
1732
1733    } while( slot = bt_findnext (bt, set, slot) );
1734
1735   bt_unlockpage (bt, BtLockRead, set->latch);
1736   bt_unpinlatch (set->latch);
1737   return ret;
1738 }
1739
1740 //      check page for space available,
1741 //      clean if necessary and return
1742 //      0 - page needs splitting
1743 //      >0  new slot value
1744
1745 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1746 {
1747 uint nxt = bt->mgr->page_size;
1748 BtPage page = set->page;
1749 uint cnt = 0, idx = 0;
1750 uint max = page->cnt;
1751 uint newslot = max;
1752 BtKey *key;
1753 BtVal *val;
1754
1755         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1756                 return slot;
1757
1758         //      skip cleanup and proceed to split
1759         //      if there's not enough garbage
1760         //      to bother with.
1761
1762         if( page->garbage < nxt / 5 )
1763                 return 0;
1764
1765         memcpy (bt->frame, page, bt->mgr->page_size);
1766
1767         // skip page info and set rest of page to zero
1768
1769         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1770         set->latch->dirty = 1;
1771         page->garbage = 0;
1772         page->act = 0;
1773
1774         // clean up page first by
1775         // removing deleted keys
1776
1777         while( cnt++ < max ) {
1778                 if( cnt == slot )
1779                         newslot = idx + 2;
1780
1781                 if( cnt < max || bt->frame->lvl )
1782                   if( slotptr(bt->frame,cnt)->dead )
1783                         continue;
1784
1785                 // copy the value across
1786
1787                 val = valptr(bt->frame, cnt);
1788                 nxt -= val->len + sizeof(BtVal);
1789                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1790
1791                 // copy the key across
1792
1793                 key = keyptr(bt->frame, cnt);
1794                 nxt -= key->len + sizeof(BtKey);
1795                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1796
1797                 // make a librarian slot
1798
1799                 slotptr(page, ++idx)->off = nxt;
1800                 slotptr(page, idx)->type = Librarian;
1801                 slotptr(page, idx)->dead = 1;
1802
1803                 // set up the slot
1804
1805                 slotptr(page, ++idx)->off = nxt;
1806                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1807
1808                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1809                         page->act++;
1810         }
1811
1812         page->min = nxt;
1813         page->cnt = idx;
1814
1815         //      see if page has enough space now, or does it need splitting?
1816
1817         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1818                 return newslot;
1819
1820         return 0;
1821 }
1822
1823 // split the root and raise the height of the btree
1824
1825 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1826 {  
1827 unsigned char leftkey[BT_keyarray];
1828 uint nxt = bt->mgr->page_size;
1829 unsigned char value[BtId];
1830 BtPageSet left[1];
1831 uid left_page_no;
1832 BtKey *ptr;
1833 BtVal *val;
1834
1835         //      save left page fence key for new root
1836
1837         ptr = keyptr(root->page, root->page->cnt);
1838         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1839
1840         //  Obtain an empty page to use, and copy the current
1841         //  root contents into it, e.g. lower keys
1842
1843         if( bt_newpage(bt, left, root->page) )
1844                 return bt->err;
1845
1846         left_page_no = left->latch->page_no;
1847         bt_unpinlatch (left->latch);
1848
1849         // preserve the page info at the bottom
1850         // of higher keys and set rest to zero
1851
1852         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1853
1854         // insert stopper key at top of newroot page
1855         // and increase the root height
1856
1857         nxt -= BtId + sizeof(BtVal);
1858         bt_putid (value, right->page_no);
1859         val = (BtVal *)((unsigned char *)root->page + nxt);
1860         memcpy (val->value, value, BtId);
1861         val->len = BtId;
1862
1863         nxt -= 2 + sizeof(BtKey);
1864         slotptr(root->page, 2)->off = nxt;
1865         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1866         ptr->len = 2;
1867         ptr->key[0] = 0xff;
1868         ptr->key[1] = 0xff;
1869
1870         // insert lower keys page fence key on newroot page as first key
1871
1872         nxt -= BtId + sizeof(BtVal);
1873         bt_putid (value, left_page_no);
1874         val = (BtVal *)((unsigned char *)root->page + nxt);
1875         memcpy (val->value, value, BtId);
1876         val->len = BtId;
1877
1878         ptr = (BtKey *)leftkey;
1879         nxt -= ptr->len + sizeof(BtKey);
1880         slotptr(root->page, 1)->off = nxt;
1881         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1882         
1883         bt_putid(root->page->right, 0);
1884         root->page->min = nxt;          // reset lowest used offset and key count
1885         root->page->cnt = 2;
1886         root->page->act = 2;
1887         root->page->lvl++;
1888
1889         // release and unpin root pages
1890
1891         bt_unlockpage(bt, BtLockWrite, root->latch);
1892         bt_unpinlatch (root->latch);
1893
1894         bt_unpinlatch (right);
1895         return 0;
1896 }
1897
1898 //  split already locked full node
1899 //      leave it locked.
1900 //      return pool entry for new right
1901 //      page, unlocked
1902
1903 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1904 {
1905 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1906 uint lvl = set->page->lvl;
1907 BtPageSet right[1];
1908 BtKey *key, *ptr;
1909 BtVal *val, *src;
1910 uid right2;
1911 uint prev;
1912
1913         //  split higher half of keys to bt->frame
1914
1915         memset (bt->frame, 0, bt->mgr->page_size);
1916         max = set->page->cnt;
1917         cnt = max / 2;
1918         idx = 0;
1919
1920         while( cnt++ < max ) {
1921                 if( cnt < max || set->page->lvl )
1922                   if( slotptr(set->page, cnt)->dead )
1923                         continue;
1924
1925                 src = valptr(set->page, cnt);
1926                 nxt -= src->len + sizeof(BtVal);
1927                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1928
1929                 key = keyptr(set->page, cnt);
1930                 nxt -= key->len + sizeof(BtKey);
1931                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1932                 memcpy (ptr, key, key->len + sizeof(BtKey));
1933
1934                 //      add librarian slot
1935
1936                 slotptr(bt->frame, ++idx)->off = nxt;
1937                 slotptr(bt->frame, idx)->type = Librarian;
1938                 slotptr(bt->frame, idx)->dead = 1;
1939
1940                 //  add actual slot
1941
1942                 slotptr(bt->frame, ++idx)->off = nxt;
1943                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1944
1945                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1946                         bt->frame->act++;
1947         }
1948
1949         bt->frame->bits = bt->mgr->page_bits;
1950         bt->frame->min = nxt;
1951         bt->frame->cnt = idx;
1952         bt->frame->lvl = lvl;
1953
1954         // link right node
1955
1956         if( set->latch->page_no > ROOT_page )
1957                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1958
1959         //      get new free page and write higher keys to it.
1960
1961         if( bt_newpage(bt, right, bt->frame) )
1962                 return 0;
1963
1964         memcpy (bt->frame, set->page, bt->mgr->page_size);
1965         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1966         set->latch->dirty = 1;
1967
1968         nxt = bt->mgr->page_size;
1969         set->page->garbage = 0;
1970         set->page->act = 0;
1971         max /= 2;
1972         cnt = 0;
1973         idx = 0;
1974
1975         if( slotptr(bt->frame, max)->type == Librarian )
1976                 max--;
1977
1978         //  assemble page of smaller keys
1979
1980         while( cnt++ < max ) {
1981                 if( slotptr(bt->frame, cnt)->dead )
1982                         continue;
1983                 val = valptr(bt->frame, cnt);
1984                 nxt -= val->len + sizeof(BtVal);
1985                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1986
1987                 key = keyptr(bt->frame, cnt);
1988                 nxt -= key->len + sizeof(BtKey);
1989                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1990
1991                 //      add librarian slot
1992
1993                 slotptr(set->page, ++idx)->off = nxt;
1994                 slotptr(set->page, idx)->type = Librarian;
1995                 slotptr(set->page, idx)->dead = 1;
1996
1997                 //      add actual slot
1998
1999                 slotptr(set->page, ++idx)->off = nxt;
2000                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2001                 set->page->act++;
2002         }
2003
2004         bt_putid(set->page->right, right->latch->page_no);
2005         set->page->min = nxt;
2006         set->page->cnt = idx;
2007
2008         return right->latch->entry;
2009 }
2010
2011 //      fix keys for newly split page
2012 //      call with page locked, return
2013 //      unlocked
2014
2015 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2016 {
2017 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2018 unsigned char value[BtId];
2019 uint lvl = set->page->lvl;
2020 BtPage page;
2021 BtKey *ptr;
2022
2023         // if current page is the root page, split it
2024
2025         if( set->latch->page_no == ROOT_page )
2026                 return bt_splitroot (bt, set, right);
2027
2028         ptr = keyptr(set->page, set->page->cnt);
2029         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2030
2031         page = bt_mappage (bt, right);
2032
2033         ptr = keyptr(page, page->cnt);
2034         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2035
2036         // insert new fences in their parent pages
2037
2038         bt_lockpage (bt, BtLockParent, right);
2039
2040         bt_lockpage (bt, BtLockParent, set->latch);
2041         bt_unlockpage (bt, BtLockWrite, set->latch);
2042
2043         // insert new fence for reformulated left block of smaller keys
2044
2045         bt_putid (value, set->latch->page_no);
2046         ptr = (BtKey *)leftkey;
2047
2048         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2049                 return bt->err;
2050
2051         // switch fence for right block of larger keys to new right page
2052
2053         bt_putid (value, right->page_no);
2054         ptr = (BtKey *)rightkey;
2055
2056         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2057                 return bt->err;
2058
2059         bt_unlockpage (bt, BtLockParent, set->latch);
2060         bt_unpinlatch (set->latch);
2061
2062         bt_unlockpage (bt, BtLockParent, right);
2063         bt_unpinlatch (right);
2064         return 0;
2065 }
2066
2067 //      install new key and value onto page
2068 //      page must already be checked for
2069 //      adequate space
2070
2071 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2072 {
2073 uint idx, librarian;
2074 BtSlot *node;
2075 BtKey *ptr;
2076 BtVal *val;
2077
2078         //      if found slot > desired slot and previous slot
2079         //      is a librarian slot, use it
2080
2081         if( slot > 1 )
2082           if( slotptr(set->page, slot-1)->type == Librarian )
2083                 slot--;
2084
2085         // copy value onto page
2086
2087         set->page->min -= vallen + sizeof(BtVal);
2088         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2089         memcpy (val->value, value, vallen);
2090         val->len = vallen;
2091
2092         // copy key onto page
2093
2094         set->page->min -= keylen + sizeof(BtKey);
2095         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2096         memcpy (ptr->key, key, keylen);
2097         ptr->len = keylen;
2098         
2099         //      find first empty slot
2100
2101         for( idx = slot; idx < set->page->cnt; idx++ )
2102           if( slotptr(set->page, idx)->dead )
2103                 break;
2104
2105         // now insert key into array before slot
2106
2107         if( idx == set->page->cnt )
2108                 idx += 2, set->page->cnt += 2, librarian = 2;
2109         else
2110                 librarian = 1;
2111
2112         set->latch->dirty = 1;
2113         set->page->act++;
2114
2115         while( idx > slot + librarian - 1 )
2116                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2117
2118         //      add librarian slot
2119
2120         if( librarian > 1 ) {
2121                 node = slotptr(set->page, slot++);
2122                 node->off = set->page->min;
2123                 node->type = Librarian;
2124                 node->dead = 1;
2125         }
2126
2127         //      fill in new slot
2128
2129         node = slotptr(set->page, slot);
2130         node->off = set->page->min;
2131         node->type = type;
2132         node->dead = 0;
2133
2134         if( release ) {
2135                 bt_unlockpage (bt, BtLockWrite, set->latch);
2136                 bt_unpinlatch (set->latch);
2137         }
2138
2139         return 0;
2140 }
2141
2142 //  Insert new key into the btree at given level.
2143 //      either add a new key or update/add an existing one
2144
2145 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2146 {
2147 unsigned char newkey[BT_keyarray];
2148 uint slot, idx, len, entry;
2149 BtPageSet set[1];
2150 BtKey *ptr, *ins;
2151 uid sequence;
2152 BtVal *val;
2153 uint type;
2154
2155   // set up the key we're working on
2156
2157   ins = (BtKey*)newkey;
2158   memcpy (ins->key, key, keylen);
2159   ins->len = keylen;
2160
2161   // is this a non-unique index value?
2162
2163   if( unique )
2164         type = Unique;
2165   else {
2166         type = Duplicate;
2167         sequence = bt_newdup (bt);
2168         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2169         ins->len += BtId;
2170   }
2171
2172   while( 1 ) { // find the page and slot for the current key
2173         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2174                 ptr = keyptr(set->page, slot);
2175         else {
2176                 if( !bt->err )
2177                         bt->err = BTERR_ovflw;
2178                 return bt->err;
2179         }
2180
2181         // if librarian slot == found slot, advance to real slot
2182
2183         if( slotptr(set->page, slot)->type == Librarian )
2184           if( !keycmp (ptr, key, keylen) )
2185                 ptr = keyptr(set->page, ++slot);
2186
2187         len = ptr->len;
2188
2189         if( slotptr(set->page, slot)->type == Duplicate )
2190                 len -= BtId;
2191
2192         //  if inserting a duplicate key or unique key
2193         //      check for adequate space on the page
2194         //      and insert the new key before slot.
2195
2196         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2197           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2198                 if( !(entry = bt_splitpage (bt, set)) )
2199                   return bt->err;
2200                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2201                   return bt->err;
2202                 else
2203                   continue;
2204
2205           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2206         }
2207
2208         // if key already exists, update value and return
2209
2210         val = valptr(set->page, slot);
2211
2212         if( val->len >= vallen ) {
2213                 if( slotptr(set->page, slot)->dead )
2214                         set->page->act++;
2215                 set->page->garbage += val->len - vallen;
2216                 set->latch->dirty = 1;
2217                 slotptr(set->page, slot)->dead = 0;
2218                 val->len = vallen;
2219                 memcpy (val->value, value, vallen);
2220                 bt_unlockpage(bt, BtLockWrite, set->latch);
2221                 bt_unpinlatch (set->latch);
2222                 return 0;
2223         }
2224
2225         //      new update value doesn't fit in existing value area
2226
2227         if( !slotptr(set->page, slot)->dead )
2228                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2229         else {
2230                 slotptr(set->page, slot)->dead = 0;
2231                 set->page->act++;
2232         }
2233
2234         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2235           if( !(entry = bt_splitpage (bt, set)) )
2236                 return bt->err;
2237           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2238                 return bt->err;
2239           else
2240                 continue;
2241
2242         set->page->min -= vallen + sizeof(BtVal);
2243         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2244         memcpy (val->value, value, vallen);
2245         val->len = vallen;
2246
2247         set->latch->dirty = 1;
2248         set->page->min -= keylen + sizeof(BtKey);
2249         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2250         memcpy (ptr->key, key, keylen);
2251         ptr->len = keylen;
2252         
2253         slotptr(set->page, slot)->off = set->page->min;
2254         bt_unlockpage(bt, BtLockWrite, set->latch);
2255         bt_unpinlatch (set->latch);
2256         return 0;
2257   }
2258   return 0;
2259 }
2260
2261 typedef struct {
2262         uint entry;                     // latch table entry number
2263         uint slot:31;           // page slot number
2264         uint reuse:1;           // reused previous page
2265 } AtomicTxn;
2266
2267 typedef struct {
2268         uid page_no;            // page number for split leaf
2269         void *next;                     // next key to insert
2270         uint entry:29;          // latch table entry number
2271         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2272         uint nounlock:1;        // don't unlock ParentModification
2273         unsigned char leafkey[BT_keyarray];
2274 } AtomicKey;
2275
2276 //  find and load leaf page for given key
2277 //      leave page Atomic locked and Read locked.
2278
2279 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2280 {
2281 BtLatchSet *prevlatch;
2282 uid page_no;
2283 uint slot;
2284
2285   //  find level one slot
2286
2287   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2288         return 0;
2289
2290   // find next non-dead entry on this page
2291   //    it will be the fence key if nothing else
2292
2293   while( slotptr(set->page, slot)->dead )
2294         if( slot++ < set->page->cnt )
2295           continue;
2296         else
2297           return bt->err = BTERR_struct, 0;
2298
2299   page_no = bt_getid(valptr(set->page, slot)->value);
2300   prevlatch = set->latch;
2301
2302   while( page_no ) {
2303         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2304           set->page = bt_mappage (bt, set->latch);
2305         else
2306           return 0;
2307
2308         // obtain read lock using lock chaining with Access mode
2309         //      release & unpin parent/left sibling page
2310
2311         bt_lockpage(bt, BtLockAccess, set->latch);
2312
2313         bt_unlockpage(bt, BtLockRead, prevlatch);
2314         bt_unpinlatch (prevlatch);
2315
2316         bt_lockpage(bt, BtLockRead, set->latch);
2317
2318         //  find key on page at this level
2319         //  and descend to requested level
2320
2321         if( !set->page->kill )
2322          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2323           bt_unlockpage(bt, BtLockRead, set->latch);
2324           bt_lockpage(bt, BtLockAtomic, set->latch);
2325           bt_lockpage(bt, BtLockRead, set->latch);
2326
2327           if( !set->page->kill )
2328            if( slot = bt_findslot (set->page, key, len) ) {
2329                 bt_unlockpage(bt, BtLockAccess, set->latch);
2330                 return slot;
2331            }
2332
2333           bt_unlockpage(bt, BtLockAtomic, set->latch);
2334           }
2335
2336         //  slide right into next page
2337
2338         bt_unlockpage(bt, BtLockAccess, set->latch);
2339         page_no = bt_getid(set->page->right);
2340         prevlatch = set->latch;
2341   }
2342
2343   // return error on end of right chain
2344
2345   bt->err = BTERR_struct;
2346   return 0;     // return error
2347 }
2348
2349 //      determine actual page where key is located
2350 //  return slot number
2351
2352 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2353 {
2354 BtKey *key = keyptr(source,src);
2355 uint slot = locks[src].slot;
2356 uint entry;
2357
2358         if( src > 1 && locks[src].reuse )
2359           entry = locks[src-1].entry, slot = 0;
2360         else
2361           entry = locks[src].entry;
2362
2363         if( slot ) {
2364                 set->latch = bt->mgr->latchsets + entry;
2365                 set->page = bt_mappage (bt, set->latch);
2366                 return slot;
2367         }
2368
2369         //      is locks->reuse set? or was slot zeroed?
2370         //      if so, find where our key is located 
2371         //      on current page or pages split on
2372         //      same page txn operations.
2373
2374         do {
2375                 set->latch = bt->mgr->latchsets + entry;
2376                 set->page = bt_mappage (bt, set->latch);
2377
2378                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2379                   if( slotptr(set->page, slot)->type == Librarian )
2380                         slot++;
2381                   if( locks[src].reuse )
2382                         locks[src].entry = entry;
2383                   return slot;
2384                 }
2385         } while( entry = set->latch->split );
2386
2387         bt->err = BTERR_atomic;
2388         return 0;
2389 }
2390
2391 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2392 {
2393 BtKey *key = keyptr(source, src);
2394 BtVal *val = valptr(source, src);
2395 BtLatchSet *latch;
2396 BtPageSet set[1];
2397 uint entry, slot;
2398
2399   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2400         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2401           return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2402
2403         if( entry = bt_splitpage (bt, set) )
2404           latch = bt->mgr->latchsets + entry;
2405         else
2406           return bt->err;
2407
2408         //      splice right page into split chain
2409         //      and WriteLock it.
2410
2411         bt_lockpage(bt, BtLockWrite, latch);
2412         latch->split = set->latch->split;
2413         set->latch->split = entry;
2414         locks[src].slot = 0;
2415   }
2416
2417   return bt->err = BTERR_atomic;
2418 }
2419
2420 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2421 {
2422 BtKey *key = keyptr(source, src);
2423 uint idx, entry, slot;
2424 BtPageSet set[1];
2425 BtKey *ptr;
2426 BtVal *val;
2427
2428         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2429           ptr = keyptr(set->page, slot);
2430         else
2431           return bt->err = BTERR_struct;
2432
2433         if( !keycmp (ptr, key->key, key->len) )
2434           if( !slotptr(set->page, slot)->dead )
2435                 slotptr(set->page, slot)->dead = 1;
2436           else
2437                 return 0;
2438         else
2439                 return 0;
2440
2441         val = valptr(set->page, slot);
2442         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2443         set->latch->dirty = 1;
2444         set->page->act--;
2445         bt->found++;
2446         return 0;
2447 }
2448
2449 //      delete an empty master page for a transaction
2450
2451 //      note that the far right page never empties because
2452 //      it always contains (at least) the infinite stopper key
2453 //      and that all pages that don't contain any keys are
2454 //      deleted, or are being held under Atomic lock.
2455
2456 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2457 {
2458 BtPageSet right[1], temp[1];
2459 unsigned char value[BtId];
2460 uid right_page_no;
2461 BtKey *ptr;
2462
2463         bt_lockpage(bt, BtLockWrite, prev->latch);
2464
2465         //      grab the right sibling
2466
2467         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2468                 right->page = bt_mappage (bt, right->latch);
2469         else
2470                 return bt->err;
2471
2472         bt_lockpage(bt, BtLockAtomic, right->latch);
2473         bt_lockpage(bt, BtLockWrite, right->latch);
2474
2475         //      and pull contents over empty page
2476         //      while preserving master's left link
2477
2478         memcpy (right->page->left, prev->page->left, BtId);
2479         memcpy (prev->page, right->page, bt->mgr->page_size);
2480
2481         //      forward seekers to old right sibling
2482         //      to new page location in set
2483
2484         bt_putid (right->page->right, prev->latch->page_no);
2485         right->latch->dirty = 1;
2486         right->page->kill = 1;
2487
2488         //      remove pointer to right page for searchers by
2489         //      changing right fence key to point to the master page
2490
2491         ptr = keyptr(right->page,right->page->cnt);
2492         bt_putid (value, prev->latch->page_no);
2493
2494         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2495                 return bt->err;
2496
2497         //  now that master page is in good shape we can
2498         //      remove its locks.
2499
2500         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2501         bt_unlockpage (bt, BtLockWrite, prev->latch);
2502
2503         //  fix master's right sibling's left pointer
2504         //      to remove scanner's poiner to the right page
2505
2506         if( right_page_no = bt_getid (prev->page->right) ) {
2507           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2508                 temp->page = bt_mappage (bt, temp->latch);
2509
2510           bt_lockpage (bt, BtLockWrite, temp->latch);
2511           bt_putid (temp->page->left, prev->latch->page_no);
2512           temp->latch->dirty = 1;
2513
2514           bt_unlockpage (bt, BtLockWrite, temp->latch);
2515           bt_unpinlatch (temp->latch);
2516         } else {        // master is now the far right page
2517           bt_spinwritelock (bt->mgr->lock);
2518           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2519           bt_spinreleasewrite(bt->mgr->lock);
2520         }
2521
2522         //      now that there are no pointers to the right page
2523         //      we can delete it after the last read access occurs
2524
2525         bt_unlockpage (bt, BtLockWrite, right->latch);
2526         bt_unlockpage (bt, BtLockAtomic, right->latch);
2527         bt_lockpage (bt, BtLockDelete, right->latch);
2528         bt_lockpage (bt, BtLockWrite, right->latch);
2529         bt_freepage (bt, right);
2530         return 0;
2531 }
2532
2533 //      atomic modification of a batch of keys.
2534
2535 //      return -1 if BTERR is set
2536 //      otherwise return slot number
2537 //      causing the key constraint violation
2538 //      or zero on successful completion.
2539
2540 int bt_atomictxn (BtDb *bt, BtPage source)
2541 {
2542 uint src, idx, slot, samepage, entry;
2543 AtomicKey *head, *tail, *leaf;
2544 BtPageSet set[1], prev[1];
2545 unsigned char value[BtId];
2546 BtKey *key, *ptr, *key2;
2547 BtLatchSet *latch;
2548 AtomicTxn *locks;
2549 int result = 0;
2550 BtSlot temp[1];
2551 BtPage page;
2552 BtVal *val;
2553 uid right;
2554 int type;
2555
2556   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2557   head = NULL;
2558   tail = NULL;
2559
2560   // stable sort the list of keys into order to
2561   //    prevent deadlocks between threads.
2562
2563   for( src = 1; src++ < source->cnt; ) {
2564         *temp = *slotptr(source,src);
2565         key = keyptr (source,src);
2566
2567         for( idx = src; --idx; ) {
2568           key2 = keyptr (source,idx);
2569           if( keycmp (key, key2->key, key2->len) < 0 ) {
2570                 *slotptr(source,idx+1) = *slotptr(source,idx);
2571                 *slotptr(source,idx) = *temp;
2572           } else
2573                 break;
2574         }
2575   }
2576
2577   // Load the leaf page for each key
2578   // group same page references with reuse bit
2579   // and determine any constraint violations
2580
2581   for( src = 0; src++ < source->cnt; ) {
2582         key = keyptr(source, src);
2583         slot = 0;
2584
2585         // first determine if this modification falls
2586         // on the same page as the previous modification
2587         //      note that the far right leaf page is a special case
2588
2589         if( samepage = src > 1 )
2590           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2591                 slot = bt_findslot(set->page, key->key, key->len);
2592           else // release read on previous page
2593                 bt_unlockpage(bt, BtLockRead, set->latch); 
2594
2595         if( !slot )
2596           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2597                 set->latch->split = 0;
2598           else
2599                 return -1;
2600
2601         if( slotptr(set->page, slot)->type == Librarian )
2602           ptr = keyptr(set->page, ++slot);
2603         else
2604           ptr = keyptr(set->page, slot);
2605
2606         if( !samepage ) {
2607           locks[src].entry = set->latch->entry;
2608           locks[src].slot = slot;
2609           locks[src].reuse = 0;
2610         } else {
2611           locks[src].entry = 0;
2612           locks[src].slot = 0;
2613           locks[src].reuse = 1;
2614         }
2615
2616         switch( slotptr(source, src)->type ) {
2617         case Duplicate:
2618         case Unique:
2619           if( !slotptr(set->page, slot)->dead )
2620            if( slot < set->page->cnt || bt_getid (set->page->right) )
2621             if( !keycmp (ptr, key->key, key->len) ) {
2622
2623                   // return constraint violation if key already exists
2624
2625                   bt_unlockpage(bt, BtLockRead, set->latch);
2626                   result = src;
2627
2628                   while( src ) {
2629                         if( locks[src].entry ) {
2630                           set->latch = bt->mgr->latchsets + locks[src].entry;
2631                           bt_unlockpage(bt, BtLockAtomic, set->latch);
2632                           bt_unpinlatch (set->latch);
2633                         }
2634                         src--;
2635                   }
2636                   free (locks);
2637                   return result;
2638                 }
2639           break;
2640         }
2641   }
2642
2643   //  unlock last loadpage lock
2644
2645   if( source->cnt > 1 )
2646         bt_unlockpage(bt, BtLockRead, set->latch);
2647
2648   //  obtain write lock for each master page
2649
2650   for( src = 0; src++ < source->cnt; )
2651         if( locks[src].reuse )
2652           continue;
2653         else
2654           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2655
2656   // insert or delete each key
2657   // process any splits or merges
2658   // release Write & Atomic latches
2659   // set ParentModifications and build
2660   // queue of keys to insert for split pages
2661   // or delete for deleted pages.
2662
2663   // run through txn list backwards
2664
2665   samepage = source->cnt + 1;
2666
2667   for( src = source->cnt; src; src-- ) {
2668         if( locks[src].reuse )
2669           continue;
2670
2671         //  perform the txn operations
2672         //      from smaller to larger on
2673         //  the same page
2674
2675         for( idx = src; idx < samepage; idx++ )
2676          switch( slotptr(source,idx)->type ) {
2677          case Delete:
2678           if( bt_atomicdelete (bt, source, locks, idx) )
2679                 return -1;
2680           break;
2681
2682         case Duplicate:
2683         case Unique:
2684           if( bt_atomicinsert (bt, source, locks, idx) )
2685                 return -1;
2686           break;
2687         }
2688
2689         //      after the same page operations have finished,
2690         //  process master page for splits or deletion.
2691
2692         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2693         prev->page = bt_mappage (bt, prev->latch);
2694         samepage = src;
2695
2696         //  pick-up all splits from master page
2697         //      each one is already WriteLocked.
2698
2699         entry = prev->latch->split;
2700
2701         while( entry ) {
2702           set->latch = bt->mgr->latchsets + entry;
2703           set->page = bt_mappage (bt, set->latch);
2704           entry = set->latch->split;
2705
2706           // delete empty master page by undoing its split
2707           //  (this is potentially another empty page)
2708           //  note that there are no new left pointers yet
2709
2710           if( !prev->page->act ) {
2711                 memcpy (set->page->left, prev->page->left, BtId);
2712                 memcpy (prev->page, set->page, bt->mgr->page_size);
2713                 bt_lockpage (bt, BtLockDelete, set->latch);
2714                 bt_freepage (bt, set);
2715
2716                 prev->latch->dirty = 1;
2717                 continue;
2718           }
2719
2720           // remove empty page from the split chain
2721
2722           if( !set->page->act ) {
2723                 memcpy (prev->page->right, set->page->right, BtId);
2724                 prev->latch->split = set->latch->split;
2725                 bt_lockpage (bt, BtLockDelete, set->latch);
2726                 bt_freepage (bt, set);
2727                 continue;
2728           }
2729
2730           //  schedule prev fence key update
2731
2732           ptr = keyptr(prev->page,prev->page->cnt);
2733           leaf = calloc (sizeof(AtomicKey), 1);
2734
2735           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2736           leaf->page_no = prev->latch->page_no;
2737           leaf->entry = prev->latch->entry;
2738           leaf->type = 0;
2739
2740           if( tail )
2741                 tail->next = leaf;
2742           else
2743                 head = leaf;
2744
2745           tail = leaf;
2746
2747           // splice in the left link into the split page
2748
2749           bt_putid (set->page->left, prev->latch->page_no);
2750           bt_lockpage(bt, BtLockParent, prev->latch);
2751           bt_unlockpage(bt, BtLockWrite, prev->latch);
2752           *prev = *set;
2753         }
2754
2755         //  update left pointer in next right page from last split page
2756         //      (if all splits were reversed, latch->split == 0)
2757
2758         if( latch->split ) {
2759           //  fix left pointer in master's original (now split)
2760           //  far right sibling or set rightmost page in page zero
2761
2762           if( right = bt_getid (prev->page->right) ) {
2763                 if( set->latch = bt_pinlatch (bt, right, 1) )
2764                   set->page = bt_mappage (bt, set->latch);
2765                 else
2766                   return -1;
2767
2768             bt_lockpage (bt, BtLockWrite, set->latch);
2769             bt_putid (set->page->left, prev->latch->page_no);
2770                 set->latch->dirty = 1;
2771             bt_unlockpage (bt, BtLockWrite, set->latch);
2772                 bt_unpinlatch (set->latch);
2773           } else {      // prev is rightmost page
2774             bt_spinwritelock (bt->mgr->lock);
2775                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2776             bt_spinreleasewrite(bt->mgr->lock);
2777           }
2778
2779           //  Process last page split in chain
2780
2781           ptr = keyptr(prev->page,prev->page->cnt);
2782           leaf = calloc (sizeof(AtomicKey), 1);
2783
2784           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2785           leaf->page_no = prev->latch->page_no;
2786           leaf->entry = prev->latch->entry;
2787           leaf->type = 0;
2788   
2789           if( tail )
2790                 tail->next = leaf;
2791           else
2792                 head = leaf;
2793
2794           tail = leaf;
2795
2796           bt_lockpage(bt, BtLockParent, prev->latch);
2797           bt_unlockpage(bt, BtLockWrite, prev->latch);
2798
2799           //  remove atomic lock on master page
2800
2801           bt_unlockpage(bt, BtLockAtomic, latch);
2802           continue;
2803         }
2804
2805         //  finished if prev page occupied (either master or final split)
2806
2807         if( prev->page->act ) {
2808           bt_unlockpage(bt, BtLockWrite, latch);
2809           bt_unlockpage(bt, BtLockAtomic, latch);
2810           bt_unpinlatch(latch);
2811           continue;
2812         }
2813
2814         // any and all splits were reversed, and the
2815         // master page located in prev is empty, delete it
2816         // by pulling over master's right sibling.
2817
2818         // Remove empty master's fence key
2819
2820         ptr = keyptr(prev->page,prev->page->cnt);
2821
2822         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2823                 return -1;
2824
2825         //      perform the remainder of the delete
2826         //      from the FIFO queue
2827
2828         leaf = calloc (sizeof(AtomicKey), 1);
2829
2830         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2831         leaf->page_no = prev->latch->page_no;
2832         leaf->entry = prev->latch->entry;
2833         leaf->nounlock = 1;
2834         leaf->type = 2;
2835   
2836         if( tail )
2837           tail->next = leaf;
2838         else
2839           head = leaf;
2840
2841         tail = leaf;
2842
2843         //      leave atomic lock in place until
2844         //      deletion completes in next phase.
2845
2846         bt_unlockpage(bt, BtLockWrite, prev->latch);
2847   }
2848   
2849   //  add & delete keys for any pages split or merged during transaction
2850
2851   if( leaf = head )
2852     do {
2853           set->latch = bt->mgr->latchsets + leaf->entry;
2854           set->page = bt_mappage (bt, set->latch);
2855
2856           bt_putid (value, leaf->page_no);
2857           ptr = (BtKey *)leaf->leafkey;
2858
2859           switch( leaf->type ) {
2860           case 0:       // insert key
2861             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2862                   return -1;
2863
2864                 break;
2865
2866           case 1:       // delete key
2867                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2868                   return -1;
2869
2870                 break;
2871
2872           case 2:       // free page
2873                 if( bt_atomicfree (bt, set) )
2874                   return -1;
2875
2876                 break;
2877           }
2878
2879           if( !leaf->nounlock )
2880             bt_unlockpage (bt, BtLockParent, set->latch);
2881
2882           bt_unpinlatch (set->latch);
2883           tail = leaf->next;
2884           free (leaf);
2885         } while( leaf = tail );
2886
2887   // return success
2888
2889   free (locks);
2890   return 0;
2891 }
2892
2893 //      set cursor to highest slot on highest page
2894
2895 uint bt_lastkey (BtDb *bt)
2896 {
2897 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2898 BtPageSet set[1];
2899
2900         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2901                 set->page = bt_mappage (bt, set->latch);
2902         else
2903                 return 0;
2904
2905     bt_lockpage(bt, BtLockRead, set->latch);
2906         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2907     bt_unlockpage(bt, BtLockRead, set->latch);
2908         bt_unpinlatch (set->latch);
2909
2910         bt->cursor_page = page_no;
2911         return bt->cursor->cnt;
2912 }
2913
2914 //      return previous slot on cursor page
2915
2916 uint bt_prevkey (BtDb *bt, uint slot)
2917 {
2918 uid ourright, next, us = bt->cursor_page;
2919 BtPageSet set[1];
2920
2921         if( --slot )
2922                 return slot;
2923
2924         ourright = bt_getid(bt->cursor->right);
2925
2926 goleft:
2927         if( !(next = bt_getid(bt->cursor->left)) )
2928                 return 0;
2929
2930 findourself:
2931         bt->cursor_page = next;
2932
2933         if( set->latch = bt_pinlatch (bt, next, 1) )
2934                 set->page = bt_mappage (bt, set->latch);
2935         else
2936                 return 0;
2937
2938     bt_lockpage(bt, BtLockRead, set->latch);
2939         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2940         bt_unlockpage(bt, BtLockRead, set->latch);
2941         bt_unpinlatch (set->latch);
2942         
2943         next = bt_getid (bt->cursor->right);
2944
2945         if( bt->cursor->kill )
2946                 goto findourself;
2947
2948         if( next != us )
2949           if( next == ourright )
2950                 goto goleft;
2951           else
2952                 goto findourself;
2953
2954         return bt->cursor->cnt;
2955 }
2956
2957 //  return next slot on cursor page
2958 //  or slide cursor right into next page
2959
2960 uint bt_nextkey (BtDb *bt, uint slot)
2961 {
2962 BtPageSet set[1];
2963 uid right;
2964
2965   do {
2966         right = bt_getid(bt->cursor->right);
2967
2968         while( slot++ < bt->cursor->cnt )
2969           if( slotptr(bt->cursor,slot)->dead )
2970                 continue;
2971           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2972                 return slot;
2973           else
2974                 break;
2975
2976         if( !right )
2977                 break;
2978
2979         bt->cursor_page = right;
2980
2981         if( set->latch = bt_pinlatch (bt, right, 1) )
2982                 set->page = bt_mappage (bt, set->latch);
2983         else
2984                 return 0;
2985
2986     bt_lockpage(bt, BtLockRead, set->latch);
2987
2988         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2989
2990         bt_unlockpage(bt, BtLockRead, set->latch);
2991         bt_unpinlatch (set->latch);
2992         slot = 0;
2993
2994   } while( 1 );
2995
2996   return bt->err = 0;
2997 }
2998
2999 //  cache page of keys into cursor and return starting slot for given key
3000
3001 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3002 {
3003 BtPageSet set[1];
3004 uint slot;
3005
3006         // cache page for retrieval
3007
3008         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3009           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3010         else
3011           return 0;
3012
3013         bt->cursor_page = set->latch->page_no;
3014
3015         bt_unlockpage(bt, BtLockRead, set->latch);
3016         bt_unpinlatch (set->latch);
3017         return slot;
3018 }
3019
3020 BtKey *bt_key(BtDb *bt, uint slot)
3021 {
3022         return keyptr(bt->cursor, slot);
3023 }
3024
3025 BtVal *bt_val(BtDb *bt, uint slot)
3026 {
3027         return valptr(bt->cursor,slot);
3028 }
3029
3030 #ifdef STANDALONE
3031
3032 #ifndef unix
3033 double getCpuTime(int type)
3034 {
3035 FILETIME crtime[1];
3036 FILETIME xittime[1];
3037 FILETIME systime[1];
3038 FILETIME usrtime[1];
3039 SYSTEMTIME timeconv[1];
3040 double ans = 0;
3041
3042         memset (timeconv, 0, sizeof(SYSTEMTIME));
3043
3044         switch( type ) {
3045         case 0:
3046                 GetSystemTimeAsFileTime (xittime);
3047                 FileTimeToSystemTime (xittime, timeconv);
3048                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3049                 break;
3050         case 1:
3051                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3052                 FileTimeToSystemTime (usrtime, timeconv);
3053                 break;
3054         case 2:
3055                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3056                 FileTimeToSystemTime (systime, timeconv);
3057                 break;
3058         }
3059
3060         ans += (double)timeconv->wHour * 3600;
3061         ans += (double)timeconv->wMinute * 60;
3062         ans += (double)timeconv->wSecond;
3063         ans += (double)timeconv->wMilliseconds / 1000;
3064         return ans;
3065 }
3066 #else
3067 #include <time.h>
3068 #include <sys/resource.h>
3069
3070 double getCpuTime(int type)
3071 {
3072 struct rusage used[1];
3073 struct timeval tv[1];
3074
3075         switch( type ) {
3076         case 0:
3077                 gettimeofday(tv, NULL);
3078                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3079
3080         case 1:
3081                 getrusage(RUSAGE_SELF, used);
3082                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3083
3084         case 2:
3085                 getrusage(RUSAGE_SELF, used);
3086                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3087         }
3088
3089         return 0;
3090 }
3091 #endif
3092
3093 void bt_poolaudit (BtMgr *mgr)
3094 {
3095 BtLatchSet *latch;
3096 uint slot = 0;
3097
3098         while( slot++ < mgr->latchdeployed ) {
3099                 latch = mgr->latchsets + slot;
3100
3101                 if( *latch->readwr->rin & MASK )
3102                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3103                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3104
3105                 if( *latch->access->rin & MASK )
3106                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3107                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3108
3109                 if( *latch->parent->ticket != *latch->parent->serving )
3110                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3111                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3112
3113                 if( latch->pin & ~CLOCK_bit ) {
3114                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3115                         latch->pin = 0;
3116                 }
3117         }
3118 }
3119
3120 uint bt_latchaudit (BtDb *bt)
3121 {
3122 ushort idx, hashidx;
3123 uid next, page_no;
3124 BtLatchSet *latch;
3125 uint cnt = 0;
3126 BtKey *ptr;
3127
3128         if( *(ushort *)(bt->mgr->lock) )
3129                 fprintf(stderr, "Alloc page locked\n");
3130         *(ushort *)(bt->mgr->lock) = 0;
3131
3132         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3133                 latch = bt->mgr->latchsets + idx;
3134                 if( *latch->readwr->rin & MASK )
3135                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3136                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3137
3138                 if( *latch->access->rin & MASK )
3139                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3140                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3141
3142                 if( *latch->parent->ticket != *latch->parent->serving )
3143                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3144                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3145
3146                 if( latch->pin ) {
3147                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3148                         latch->pin = 0;
3149                 }
3150         }
3151
3152         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3153           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3154                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3155
3156           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3157
3158           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3159                 latch = bt->mgr->latchsets + idx;
3160                 if( latch->pin )
3161                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3162           } while( idx = latch->next );
3163         }
3164
3165         page_no = LEAF_page;
3166
3167         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3168         uid off = page_no << bt->mgr->page_bits;
3169 #ifdef unix
3170           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3171 #else
3172         DWORD amt[1];
3173
3174           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3175
3176           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3177                 return bt->err = BTERR_map;
3178
3179           if( *amt <  bt->mgr->page_size )
3180                 return bt->err = BTERR_map;
3181 #endif
3182                 if( !bt->frame->free && !bt->frame->lvl )
3183                         cnt += bt->frame->act;
3184                 page_no++;
3185         }
3186                 
3187         cnt--;  // remove stopper key
3188         fprintf(stderr, " Total keys read %d\n", cnt);
3189
3190         bt_close (bt);
3191         return 0;
3192 }
3193
3194 typedef struct {
3195         char idx;
3196         char *type;
3197         char *infile;
3198         BtMgr *mgr;
3199         int num;
3200 } ThreadArg;
3201
3202 //  standalone program to index file of keys
3203 //  then list them onto std-out
3204
3205 #ifdef unix
3206 void *index_file (void *arg)
3207 #else
3208 uint __stdcall index_file (void *arg)
3209 #endif
3210 {
3211 int line = 0, found = 0, cnt = 0, idx;
3212 uid next, page_no = LEAF_page;  // start on first page of leaves
3213 int ch, len = 0, slot, type = 0;
3214 unsigned char key[BT_maxkey];
3215 unsigned char txn[65536];
3216 ThreadArg *args = arg;
3217 BtPageSet set[1];
3218 uint nxt = 65536;
3219 BtPage page;
3220 BtKey *ptr;
3221 BtVal *val;
3222 BtDb *bt;
3223 FILE *in;
3224
3225         bt = bt_open (args->mgr);
3226         page = (BtPage)txn;
3227
3228         if( args->idx < strlen (args->type) )
3229                 ch = args->type[args->idx];
3230         else
3231                 ch = args->type[strlen(args->type) - 1];
3232
3233         switch(ch | 0x20)
3234         {
3235         case 'a':
3236                 fprintf(stderr, "started latch mgr audit\n");
3237                 cnt = bt_latchaudit (bt);
3238                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3239                 break;
3240
3241         case 'd':
3242                 type = Delete;
3243
3244         case 'p':
3245                 if( !type )
3246                         type = Unique;
3247
3248                 if( args->num )
3249                  if( type == Delete )
3250                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3251                  else
3252                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3253                 else
3254                  if( type == Delete )
3255                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3256                  else
3257                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3258
3259                 if( in = fopen (args->infile, "rb") )
3260                   while( ch = getc(in), ch != EOF )
3261                         if( ch == '\n' )
3262                         {
3263                           line++;
3264
3265                           if( !args->num ) {
3266                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3267                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3268                             len = 0;
3269                                 continue;
3270                           }
3271
3272                           nxt -= len - 10;
3273                           memcpy (txn + nxt, key + 10, len - 10);
3274                           nxt -= 1;
3275                           txn[nxt] = len - 10;
3276                           nxt -= 10;
3277                           memcpy (txn + nxt, key, 10);
3278                           nxt -= 1;
3279                           txn[nxt] = 10;
3280                           slotptr(page,++cnt)->off  = nxt;
3281                           slotptr(page,cnt)->type = type;
3282                           len = 0;
3283
3284                           if( cnt < args->num )
3285                                 continue;
3286
3287                           page->cnt = cnt;
3288                           page->act = cnt;
3289                           page->min = nxt;
3290
3291                           if( bt_atomictxn (bt, page) )
3292                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3293                           nxt = sizeof(txn);
3294                           cnt = 0;
3295
3296                         }
3297                         else if( len < BT_maxkey )
3298                                 key[len++] = ch;
3299                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3300                 break;
3301
3302         case 'w':
3303                 fprintf(stderr, "started indexing for %s\n", args->infile);
3304                 if( in = fopen (args->infile, "r") )
3305                   while( ch = getc(in), ch != EOF )
3306                         if( ch == '\n' )
3307                         {
3308                           line++;
3309
3310                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3311                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3312                           len = 0;
3313                         }
3314                         else if( len < BT_maxkey )
3315                                 key[len++] = ch;
3316                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3317                 break;
3318
3319         case 'f':
3320                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3321                 if( in = fopen (args->infile, "rb") )
3322                   while( ch = getc(in), ch != EOF )
3323                         if( ch == '\n' )
3324                         {
3325                           line++;
3326                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3327                                 found++;
3328                           else if( bt->err )
3329                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3330                           len = 0;
3331                         }
3332                         else if( len < BT_maxkey )
3333                                 key[len++] = ch;
3334                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3335                 break;
3336
3337         case 's':
3338                 fprintf(stderr, "started scanning\n");
3339                 do {
3340                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3341                                 set->page = bt_mappage (bt, set->latch);
3342                         else
3343                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3344                         bt_lockpage (bt, BtLockRead, set->latch);
3345                         next = bt_getid (set->page->right);
3346
3347                         for( slot = 0; slot++ < set->page->cnt; )
3348                          if( next || slot < set->page->cnt )
3349                           if( !slotptr(set->page, slot)->dead ) {
3350                                 ptr = keyptr(set->page, slot);
3351                                 len = ptr->len;
3352
3353                             if( slotptr(set->page, slot)->type == Duplicate )
3354                                         len -= BtId;
3355
3356                                 fwrite (ptr->key, len, 1, stdout);
3357                                 val = valptr(set->page, slot);
3358                                 fwrite (val->value, val->len, 1, stdout);
3359                                 fputc ('\n', stdout);
3360                                 cnt++;
3361                            }
3362
3363                         bt_unlockpage (bt, BtLockRead, set->latch);
3364                         bt_unpinlatch (set->latch);
3365                 } while( page_no = next );
3366
3367                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3368                 break;
3369
3370         case 'r':
3371                 fprintf(stderr, "started reverse scan\n");
3372                 if( slot = bt_lastkey (bt) )
3373                    while( slot = bt_prevkey (bt, slot) ) {
3374                         if( slotptr(bt->cursor, slot)->dead )
3375                           continue;
3376
3377                         ptr = keyptr(bt->cursor, slot);
3378                         len = ptr->len;
3379
3380                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3381                                 len -= BtId;
3382
3383                         fwrite (ptr->key, len, 1, stdout);
3384                         val = valptr(bt->cursor, slot);
3385                         fwrite (val->value, val->len, 1, stdout);
3386                         fputc ('\n', stdout);
3387                         cnt++;
3388                   }
3389
3390                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3391                 break;
3392
3393         case 'c':
3394 #ifdef unix
3395                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3396 #endif
3397                 fprintf(stderr, "started counting\n");
3398                 page_no = LEAF_page;
3399
3400                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3401                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3402                                 break;
3403
3404                         if( !bt->frame->free && !bt->frame->lvl )
3405                                 cnt += bt->frame->act;
3406
3407                         bt->reads++;
3408                         page_no++;
3409                 }
3410                 
3411                 cnt--;  // remove stopper key
3412                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3413                 break;
3414         }
3415
3416         bt_close (bt);
3417 #ifdef unix
3418         return NULL;
3419 #else
3420         return 0;
3421 #endif
3422 }
3423
3424 typedef struct timeval timer;
3425
3426 int main (int argc, char **argv)
3427 {
3428 int idx, cnt, len, slot, err;
3429 int segsize, bits = 16;
3430 double start, stop;
3431 #ifdef unix
3432 pthread_t *threads;
3433 #else
3434 HANDLE *threads;
3435 #endif
3436 ThreadArg *args;
3437 uint poolsize = 0;
3438 float elapsed;
3439 int num = 0;
3440 char key[1];
3441 BtMgr *mgr;
3442 BtKey *ptr;
3443 BtDb *bt;
3444
3445         if( argc < 3 ) {
3446                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3447                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3448                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3449                 fprintf (stderr, "  page_bits is the page size in bits\n");
3450                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3451                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3452                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3453                 exit(0);
3454         }
3455
3456         start = getCpuTime(0);
3457
3458         if( argc > 3 )
3459                 bits = atoi(argv[3]);
3460
3461         if( argc > 4 )
3462                 poolsize = atoi(argv[4]);
3463
3464         if( !poolsize )
3465                 fprintf (stderr, "Warning: no mapped_pool\n");
3466
3467         if( argc > 5 )
3468                 num = atoi(argv[5]);
3469
3470         cnt = argc - 6;
3471 #ifdef unix
3472         threads = malloc (cnt * sizeof(pthread_t));
3473 #else
3474         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3475 #endif
3476         args = malloc (cnt * sizeof(ThreadArg));
3477
3478         mgr = bt_mgr ((argv[1]), bits, poolsize);
3479
3480         if( !mgr ) {
3481                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3482                 exit (1);
3483         }
3484
3485         //      fire off threads
3486
3487         for( idx = 0; idx < cnt; idx++ ) {
3488                 args[idx].infile = argv[idx + 6];
3489                 args[idx].type = argv[2];
3490                 args[idx].mgr = mgr;
3491                 args[idx].num = num;
3492                 args[idx].idx = idx;
3493 #ifdef unix
3494                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3495                         fprintf(stderr, "Error creating thread %d\n", err);
3496 #else
3497                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3498 #endif
3499         }
3500
3501         //      wait for termination
3502
3503 #ifdef unix
3504         for( idx = 0; idx < cnt; idx++ )
3505                 pthread_join (threads[idx], NULL);
3506 #else
3507         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3508
3509         for( idx = 0; idx < cnt; idx++ )
3510                 CloseHandle(threads[idx]);
3511
3512 #endif
3513         bt_poolaudit(mgr);
3514         bt_mgrclose (mgr);
3515
3516         elapsed = getCpuTime(0) - start;
3517         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3518         elapsed = getCpuTime(1);
3519         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3520         elapsed = getCpuTime(2);
3521         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3522 }
3523
3524 #endif  //STANDALONE