]> pd.if.org Git - btree/blob - threadskv8.c
Fix bug in bt_cleanpage and bt_splitpage
[btree] / threadskv8.c
1 // btree version threadskv8 sched_yield version
2 //      with reworked bt_deletekey code,
3 //      phase-fair reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      and ACID batched key-value updates
9
10 // 26 SEP 2014
11
12 // author: karl malbrain, malbrain@cal.berkeley.edu
13
14 /*
15 This work, including the source code, documentation
16 and related data, is placed into the public domain.
17
18 The orginal author is Karl Malbrain.
19
20 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
21 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
22 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
23 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
24 RESULTING FROM THE USE, MODIFICATION, OR
25 REDISTRIBUTION OF THIS SOFTWARE.
26 */
27
28 // Please see the project home page for documentation
29 // code.google.com/p/high-concurrency-btree
30
31 #define _FILE_OFFSET_BITS 64
32 #define _LARGEFILE64_SOURCE
33
34 #ifdef linux
35 #define _GNU_SOURCE
36 #endif
37
38 #ifdef unix
39 #include <unistd.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <fcntl.h>
43 #include <sys/time.h>
44 #include <sys/mman.h>
45 #include <errno.h>
46 #include <pthread.h>
47 #else
48 #define WIN32_LEAN_AND_MEAN
49 #include <windows.h>
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <time.h>
53 #include <fcntl.h>
54 #include <process.h>
55 #include <intrin.h>
56 #endif
57
58 #include <memory.h>
59 #include <string.h>
60 #include <stddef.h>
61
62 typedef unsigned long long      uid;
63
64 #ifndef unix
65 typedef unsigned long long      off64_t;
66 typedef unsigned short          ushort;
67 typedef unsigned int            uint;
68 #endif
69
70 #define BT_ro 0x6f72    // ro
71 #define BT_rw 0x7772    // rw
72
73 #define BT_maxbits              24                                      // maximum page size in bits
74 #define BT_minbits              9                                       // minimum page size in bits
75 #define BT_minpage              (1 << BT_minbits)       // minimum page size
76 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
77
78 //  BTree page number constants
79 #define ALLOC_page              0       // allocation page
80 #define ROOT_page               1       // root of the btree
81 #define LEAF_page               2       // first page of leaves
82
83 //      Number of levels to create in a new BTree
84
85 #define MIN_lvl                 2
86
87 /*
88 There are six lock types for each node in four independent sets: 
89 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
90 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
91 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
92 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
93 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
94 6. (set 4) AtomicModification: Exclusive. Atomic Update including node is underway. Incompatible with another AtomicModification. 
95 */
96
97 typedef enum{
98         BtLockAccess = 1,
99         BtLockDelete = 2,
100         BtLockRead   = 4,
101         BtLockWrite  = 8,
102         BtLockParent = 16,
103         BtLockAtomic = 32
104 } BtLock;
105
106 //      definition for phase-fair reader/writer lock implementation
107
108 typedef struct {
109         ushort rin[1];
110         ushort rout[1];
111         ushort ticket[1];
112         ushort serving[1];
113 } RWLock;
114
115 //      write only queue lock
116
117 typedef struct {
118         ushort ticket[1];
119         ushort serving[1];
120 } WOLock;
121
122 #define PHID 0x1
123 #define PRES 0x2
124 #define MASK 0x3
125 #define RINC 0x4
126
127 //      definition for spin latch implementation
128
129 // exclusive is set for write access
130 // share is count of read accessors
131 // grant write lock when share == 0
132
133 volatile typedef struct {
134         ushort exclusive:1;
135         ushort pending:1;
136         ushort share:14;
137 } BtSpinLatch;
138
139 #define XCL 1
140 #define PEND 2
141 #define BOTH 3
142 #define SHARE 4
143
144 //  hash table entries
145
146 typedef struct {
147         volatile uint slot;             // Latch table entry at head of chain
148         BtSpinLatch latch[1];
149 } BtHashEntry;
150
151 //      latch manager table structure
152
153 typedef struct {
154         uid page_no;                    // latch set page number
155         RWLock readwr[1];               // read/write page lock
156         RWLock access[1];               // Access Intent/Page delete
157         WOLock parent[1];               // Posting of fence key in parent
158         WOLock atomic[1];               // Atomic update in progress
159         uint split;                             // right split page atomic insert
160         uint entry;                             // entry slot in latch table
161         uint next;                              // next entry in hash table chain
162         uint prev;                              // prev entry in hash table chain
163         volatile ushort pin;    // number of outstanding threads
164         ushort dirty:1;                 // page in cache is dirty
165 } BtLatchSet;
166
167 //      Define the length of the page record numbers
168
169 #define BtId 6
170
171 //      Page key slot definition.
172
173 //      Keys are marked dead, but remain on the page until
174 //      it cleanup is called. The fence key (highest key) for
175 //      a leaf page is always present, even after cleanup.
176
177 //      Slot types
178
179 //      In addition to the Unique keys that occupy slots
180 //      there are Librarian and Duplicate key
181 //      slots occupying the key slot array.
182
183 //      The Librarian slots are dead keys that
184 //      serve as filler, available to add new Unique
185 //      or Dup slots that are inserted into the B-tree.
186
187 //      The Duplicate slots have had their key bytes extended
188 //      by 6 bytes to contain a binary duplicate key uniqueifier.
189
190 typedef enum {
191         Unique,
192         Librarian,
193         Duplicate,
194         Delete,
195         Update
196 } BtSlotType;
197
198 typedef struct {
199         uint off:BT_maxbits;    // page offset for key start
200         uint type:3;                    // type of slot
201         uint dead:1;                    // set for deleted slot
202 } BtSlot;
203
204 //      The key structure occupies space at the upper end of
205 //      each page.  It's a length byte followed by the key
206 //      bytes.
207
208 typedef struct {
209         unsigned char len;              // this can be changed to a ushort or uint
210         unsigned char key[0];
211 } BtKey;
212
213 //      the value structure also occupies space at the upper
214 //      end of the page. Each key is immediately followed by a value.
215
216 typedef struct {
217         unsigned char len;              // this can be changed to a ushort or uint
218         unsigned char value[0];
219 } BtVal;
220
221 #define BT_maxkey       255             // maximum number of bytes in a key
222 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
223
224 //      The first part of an index page.
225 //      It is immediately followed
226 //      by the BtSlot array of keys.
227
228 //      note that this structure size
229 //      must be a multiple of 8 bytes
230 //      in order to place dups correctly.
231
232 typedef struct BtPage_ {
233         uint cnt;                                       // count of keys in page
234         uint act;                                       // count of active keys
235         uint min;                                       // next key offset
236         uint garbage;                           // page garbage in bytes
237         unsigned char bits:7;           // page size in bits
238         unsigned char free:1;           // page is on free chain
239         unsigned char lvl:7;            // level of page
240         unsigned char kill:1;           // page is being deleted
241         unsigned char right[BtId];      // page number to right
242         unsigned char left[BtId];       // page number to left
243         unsigned char filler[2];        // padding to multiple of 8
244 } *BtPage;
245
246 //  The loadpage interface object
247
248 typedef struct {
249         BtPage page;            // current page pointer
250         BtLatchSet *latch;      // current page latch set
251 } BtPageSet;
252
253 //      structure for latch manager on ALLOC_page
254
255 typedef struct {
256         struct BtPage_ alloc[1];        // next page_no in right ptr
257         unsigned long long dups[1];     // global duplicate key uniqueifier
258         unsigned char chain[BtId];      // head of free page_nos chain
259 } BtPageZero;
260
261 //      The object structure for Btree access
262
263 typedef struct {
264         uint page_size;                         // page size    
265         uint page_bits;                         // page size in bits    
266 #ifdef unix
267         int idx;
268 #else
269         HANDLE idx;
270 #endif
271         BtPageZero *pagezero;           // mapped allocation page
272         BtSpinLatch lock[1];            // allocation area lite latch
273         uint latchdeployed;                     // highest number of latch entries deployed
274         uint nlatchpage;                        // number of latch pages at BT_latch
275         uint latchtotal;                        // number of page latch entries
276         uint latchhash;                         // number of latch hash table slots
277         uint latchvictim;                       // next latch entry to examine
278         ushort thread_no[1];            // next thread number
279         BtHashEntry *hashtable;         // the buffer pool hash table entries
280         BtLatchSet *latchsets;          // mapped latch set from buffer pool
281         unsigned char *pagepool;        // mapped to the buffer pool pages
282 #ifndef unix
283         HANDLE halloc;                          // allocation handle
284         HANDLE hpool;                           // buffer pool handle
285 #endif
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                             // buffer manager for thread
290         BtPage cursor;                  // cached frame for start/next (never mapped)
291         BtPage frame;                   // spare frame for the page split (never mapped)
292         uid cursor_page;                                // current cursor page number   
293         unsigned char *mem;                             // frame, cursor, page memory buffer
294         unsigned char key[BT_keyarray]; // last found complete key
295         int found;                              // last delete or insert was found
296         int err;                                // last error
297         int reads, writes;              // number of reads and writes from the btree
298         ushort thread_no;               // thread number
299 } BtDb;
300
301 typedef enum {
302         BTERR_ok = 0,
303         BTERR_struct,
304         BTERR_ovflw,
305         BTERR_lock,
306         BTERR_map,
307         BTERR_read,
308         BTERR_wrt,
309         BTERR_atomic
310 } BTERR;
311
312 #define CLOCK_bit 0x8000
313
314 // B-Tree functions
315 extern void bt_close (BtDb *bt);
316 extern BtDb *bt_open (BtMgr *mgr);
317 extern BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint len, uint lvl, void *value, uint vallen, uint update);
318 extern BTERR  bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl);
319 extern int bt_findkey    (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
320 extern BtKey *bt_foundkey (BtDb *bt);
321 extern uint bt_startkey  (BtDb *bt, unsigned char *key, uint len);
322 extern uint bt_nextkey   (BtDb *bt, uint slot);
323
324 //      manager functions
325 extern BtMgr *bt_mgr (char *name, uint bits, uint poolsize);
326 void bt_mgrclose (BtMgr *mgr);
327
328 //  Helper functions to return slot values
329 //      from the cursor page.
330
331 extern BtKey *bt_key (BtDb *bt, uint slot);
332 extern BtVal *bt_val (BtDb *bt, uint slot);
333
334 //  The page is allocated from low and hi ends.
335 //  The key slots are allocated from the bottom,
336 //      while the text and value of the key
337 //  are allocated from the top.  When the two
338 //  areas meet, the page is split into two.
339
340 //  A key consists of a length byte, two bytes of
341 //  index number (0 - 65535), and up to 253 bytes
342 //  of key value.
343
344 //  Associated with each key is a value byte string
345 //      containing any value desired.
346
347 //  The b-tree root is always located at page 1.
348 //      The first leaf page of level zero is always
349 //      located on page 2.
350
351 //      The b-tree pages are linked with next
352 //      pointers to facilitate enumerators,
353 //      and provide for concurrency.
354
355 //      When to root page fills, it is split in two and
356 //      the tree height is raised by a new root at page
357 //      one with two keys.
358
359 //      Deleted keys are marked with a dead bit until
360 //      page cleanup. The fence key for a leaf node is
361 //      always present
362
363 //  To achieve maximum concurrency one page is locked at a time
364 //  as the tree is traversed to find leaf key in question. The right
365 //  page numbers are used in cases where the page is being split,
366 //      or consolidated.
367
368 //  Page 0 is dedicated to lock for new page extensions,
369 //      and chains empty pages together for reuse. It also
370 //      contains the latch manager hash table.
371
372 //      The ParentModification lock on a node is obtained to serialize posting
373 //      or changing the fence key for a node.
374
375 //      Empty pages are chained together through the ALLOC page and reused.
376
377 //      Access macros to address slot and key values from the page
378 //      Page slots use 1 based indexing.
379
380 #define slotptr(page, slot) (((BtSlot *)(page+1)) + (slot-1))
381 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
382 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
383
384 void bt_putid(unsigned char *dest, uid id)
385 {
386 int i = BtId;
387
388         while( i-- )
389                 dest[i] = (unsigned char)id, id >>= 8;
390 }
391
392 uid bt_getid(unsigned char *src)
393 {
394 uid id = 0;
395 int i;
396
397         for( i = 0; i < BtId; i++ )
398                 id <<= 8, id |= *src++; 
399
400         return id;
401 }
402
403 uid bt_newdup (BtDb *bt)
404 {
405 #ifdef unix
406         return __sync_fetch_and_add (bt->mgr->pagezero->dups, 1) + 1;
407 #else
408         return _InterlockedIncrement64(bt->mgr->pagezero->dups, 1);
409 #endif
410 }
411
412 //      Write-Only Queue Lock
413
414 void WriteOLock (WOLock *lock)
415 {
416 ushort tix;
417 #ifdef unix
418         tix = __sync_fetch_and_add (lock->ticket, 1);
419 #else
420         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
421 #endif
422         // wait for our ticket to come up
423
424         while( tix != lock->serving[0] )
425 #ifdef unix
426                 sched_yield();
427 #else
428                 SwitchToThread ();
429 #endif
430 }
431
432 void WriteORelease (WOLock *lock)
433 {
434         lock->serving[0]++;
435 }
436
437 //      Phase-Fair reader/writer lock implementation
438
439 void WriteLock (RWLock *lock)
440 {
441 ushort w, r, tix;
442
443 #ifdef unix
444         tix = __sync_fetch_and_add (lock->ticket, 1);
445 #else
446         tix = _InterlockedExchangeAdd16 (lock->ticket, 1);
447 #endif
448         // wait for our ticket to come up
449
450         while( tix != lock->serving[0] )
451 #ifdef unix
452                 sched_yield();
453 #else
454                 SwitchToThread ();
455 #endif
456
457         w = PRES | (tix & PHID);
458 #ifdef  unix
459         r = __sync_fetch_and_add (lock->rin, w);
460 #else
461         r = _InterlockedExchangeAdd16 (lock->rin, w);
462 #endif
463         while( r != *lock->rout )
464 #ifdef unix
465                 sched_yield();
466 #else
467                 SwitchToThread();
468 #endif
469 }
470
471 void WriteRelease (RWLock *lock)
472 {
473 #ifdef unix
474         __sync_fetch_and_and (lock->rin, ~MASK);
475 #else
476         _InterlockedAnd16 (lock->rin, ~MASK);
477 #endif
478         lock->serving[0]++;
479 }
480
481 void ReadLock (RWLock *lock)
482 {
483 ushort w;
484 #ifdef unix
485         w = __sync_fetch_and_add (lock->rin, RINC) & MASK;
486 #else
487         w = _InterlockedExchangeAdd16 (lock->rin, RINC) & MASK;
488 #endif
489         if( w )
490           while( w == (*lock->rin & MASK) )
491 #ifdef unix
492                 sched_yield ();
493 #else
494                 SwitchToThread ();
495 #endif
496 }
497
498 void ReadRelease (RWLock *lock)
499 {
500 #ifdef unix
501         __sync_fetch_and_add (lock->rout, RINC);
502 #else
503         _InterlockedExchangeAdd16 (lock->rout, RINC);
504 #endif
505 }
506
507 //      Spin Latch Manager
508
509 //      wait until write lock mode is clear
510 //      and add 1 to the share count
511
512 void bt_spinreadlock(BtSpinLatch *latch)
513 {
514 ushort prev;
515
516   do {
517 #ifdef unix
518         prev = __sync_fetch_and_add ((ushort *)latch, SHARE);
519 #else
520         prev = _InterlockedExchangeAdd16((ushort *)latch, SHARE);
521 #endif
522         //  see if exclusive request is granted or pending
523
524         if( !(prev & BOTH) )
525                 return;
526 #ifdef unix
527         prev = __sync_fetch_and_add ((ushort *)latch, -SHARE);
528 #else
529         prev = _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
530 #endif
531 #ifdef  unix
532   } while( sched_yield(), 1 );
533 #else
534   } while( SwitchToThread(), 1 );
535 #endif
536 }
537
538 //      wait for other read and write latches to relinquish
539
540 void bt_spinwritelock(BtSpinLatch *latch)
541 {
542 ushort prev;
543
544   do {
545 #ifdef  unix
546         prev = __sync_fetch_and_or((ushort *)latch, PEND | XCL);
547 #else
548         prev = _InterlockedOr16((ushort *)latch, PEND | XCL);
549 #endif
550         if( !(prev & XCL) )
551           if( !(prev & ~BOTH) )
552                 return;
553           else
554 #ifdef unix
555                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
556 #else
557                 _InterlockedAnd16((ushort *)latch, ~XCL);
558 #endif
559 #ifdef  unix
560   } while( sched_yield(), 1 );
561 #else
562   } while( SwitchToThread(), 1 );
563 #endif
564 }
565
566 //      try to obtain write lock
567
568 //      return 1 if obtained,
569 //              0 otherwise
570
571 int bt_spinwritetry(BtSpinLatch *latch)
572 {
573 ushort prev;
574
575 #ifdef  unix
576         prev = __sync_fetch_and_or((ushort *)latch, XCL);
577 #else
578         prev = _InterlockedOr16((ushort *)latch, XCL);
579 #endif
580         //      take write access if all bits are clear
581
582         if( !(prev & XCL) )
583           if( !(prev & ~BOTH) )
584                 return 1;
585           else
586 #ifdef unix
587                 __sync_fetch_and_and ((ushort *)latch, ~XCL);
588 #else
589                 _InterlockedAnd16((ushort *)latch, ~XCL);
590 #endif
591         return 0;
592 }
593
594 //      clear write mode
595
596 void bt_spinreleasewrite(BtSpinLatch *latch)
597 {
598 #ifdef unix
599         __sync_fetch_and_and((ushort *)latch, ~BOTH);
600 #else
601         _InterlockedAnd16((ushort *)latch, ~BOTH);
602 #endif
603 }
604
605 //      decrement reader count
606
607 void bt_spinreleaseread(BtSpinLatch *latch)
608 {
609 #ifdef unix
610         __sync_fetch_and_add((ushort *)latch, -SHARE);
611 #else
612         _InterlockedExchangeAdd16((ushort *)latch, -SHARE);
613 #endif
614 }
615
616 //      read page from permanent location in Btree file
617
618 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no)
619 {
620 off64_t off = page_no << mgr->page_bits;
621
622 #ifdef unix
623         if( pread (mgr->idx, page, mgr->page_size, page_no << mgr->page_bits) < mgr->page_size ) {
624                 fprintf (stderr, "Unable to read page %.8x errno = %d\n", page_no, errno);
625                 return BTERR_read;
626         }
627 #else
628 OVERLAPPED ovl[1];
629 uint amt[1];
630
631         memset (ovl, 0, sizeof(OVERLAPPED));
632         ovl->Offset = off;
633         ovl->OffsetHigh = off >> 32;
634
635         if( !ReadFile(mgr->idx, page, mgr->page_size, amt, ovl)) {
636                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
637                 return BTERR_read;
638         }
639         if( *amt <  mgr->page_size ) {
640                 fprintf (stderr, "Unable to read page %.8x GetLastError = %d\n", page_no, GetLastError());
641                 return BTERR_read;
642         }
643 #endif
644         return 0;
645 }
646
647 //      write page to permanent location in Btree file
648 //      clear the dirty bit
649
650 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no)
651 {
652 off64_t off = page_no << mgr->page_bits;
653
654 #ifdef unix
655         if( pwrite(mgr->idx, page, mgr->page_size, off) < mgr->page_size )
656                 return BTERR_wrt;
657 #else
658 OVERLAPPED ovl[1];
659 uint amt[1];
660
661         memset (ovl, 0, sizeof(OVERLAPPED));
662         ovl->Offset = off;
663         ovl->OffsetHigh = off >> 32;
664
665         if( !WriteFile(mgr->idx, page, mgr->page_size, amt, ovl) )
666                 return BTERR_wrt;
667
668         if( *amt <  mgr->page_size )
669                 return BTERR_wrt;
670 #endif
671         return 0;
672 }
673
674 //      link latch table entry into head of latch hash table
675
676 BTERR bt_latchlink (BtDb *bt, uint hashidx, uint slot, uid page_no, uint loadit)
677 {
678 BtPage page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
679 BtLatchSet *latch = bt->mgr->latchsets + slot;
680
681         if( latch->next = bt->mgr->hashtable[hashidx].slot )
682                 bt->mgr->latchsets[latch->next].prev = slot;
683
684         bt->mgr->hashtable[hashidx].slot = slot;
685         latch->page_no = page_no;
686         latch->entry = slot;
687         latch->split = 0;
688         latch->prev = 0;
689         latch->pin = 1;
690
691         if( loadit )
692           if( bt->err = bt_readpage (bt->mgr, page, page_no) )
693                 return bt->err;
694           else
695                 bt->reads++;
696
697         return bt->err = 0;
698 }
699
700 //      set CLOCK bit in latch
701 //      decrement pin count
702
703 void bt_unpinlatch (BtLatchSet *latch)
704 {
705 #ifdef unix
706         if( ~latch->pin & CLOCK_bit )
707                 __sync_fetch_and_or(&latch->pin, CLOCK_bit);
708         __sync_fetch_and_add(&latch->pin, -1);
709 #else
710         if( ~latch->pin & CLOCK_bit )
711                 _InterlockedOr16 (&latch->pin, CLOCK_bit);
712         _InterlockedDecrement16 (&latch->pin);
713 #endif
714 }
715
716 //  return the btree cached page address
717
718 BtPage bt_mappage (BtDb *bt, BtLatchSet *latch)
719 {
720 BtPage page = (BtPage)(((uid)latch->entry << bt->mgr->page_bits) + bt->mgr->pagepool);
721
722         return page;
723 }
724
725 //      find existing latchset or inspire new one
726 //      return with latchset pinned
727
728 BtLatchSet *bt_pinlatch (BtDb *bt, uid page_no, uint loadit)
729 {
730 uint hashidx = page_no % bt->mgr->latchhash;
731 BtLatchSet *latch;
732 uint slot, idx;
733 uint lvl, cnt;
734 off64_t off;
735 uint amt[1];
736 BtPage page;
737
738   //  try to find our entry
739
740   bt_spinwritelock(bt->mgr->hashtable[hashidx].latch);
741
742   if( slot = bt->mgr->hashtable[hashidx].slot ) do
743   {
744         latch = bt->mgr->latchsets + slot;
745         if( page_no == latch->page_no )
746                 break;
747   } while( slot = latch->next );
748
749   //  found our entry
750   //  increment clock
751
752   if( slot ) {
753         latch = bt->mgr->latchsets + slot;
754 #ifdef unix
755         __sync_fetch_and_add(&latch->pin, 1);
756 #else
757         _InterlockedIncrement16 (&latch->pin);
758 #endif
759         bt_spinreleasewrite(bt->mgr->hashtable[hashidx].latch);
760         return latch;
761   }
762
763         //  see if there are any unused pool entries
764 #ifdef unix
765         slot = __sync_fetch_and_add (&bt->mgr->latchdeployed, 1) + 1;
766 #else
767         slot = _InterlockedIncrement (&bt->mgr->latchdeployed);
768 #endif
769
770         if( slot < bt->mgr->latchtotal ) {
771                 latch = bt->mgr->latchsets + slot;
772                 if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
773                         return NULL;
774                 bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
775                 return latch;
776         }
777
778 #ifdef unix
779         __sync_fetch_and_add (&bt->mgr->latchdeployed, -1);
780 #else
781         _InterlockedDecrement (&bt->mgr->latchdeployed);
782 #endif
783   //  find and reuse previous entry on victim
784
785   while( 1 ) {
786 #ifdef unix
787         slot = __sync_fetch_and_add(&bt->mgr->latchvictim, 1);
788 #else
789         slot = _InterlockedIncrement (&bt->mgr->latchvictim) - 1;
790 #endif
791         // try to get write lock on hash chain
792         //      skip entry if not obtained
793         //      or has outstanding pins
794
795         slot %= bt->mgr->latchtotal;
796
797         if( !slot )
798                 continue;
799
800         latch = bt->mgr->latchsets + slot;
801         idx = latch->page_no % bt->mgr->latchhash;
802
803         //      see we are on same chain as hashidx
804
805         if( idx == hashidx )
806                 continue;
807
808         if( !bt_spinwritetry (bt->mgr->hashtable[idx].latch) )
809                 continue;
810
811         //  skip this slot if it is pinned
812         //      or the CLOCK bit is set
813
814         if( latch->pin ) {
815           if( latch->pin & CLOCK_bit ) {
816 #ifdef unix
817                 __sync_fetch_and_and(&latch->pin, ~CLOCK_bit);
818 #else
819                 _InterlockedAnd16 (&latch->pin, ~CLOCK_bit);
820 #endif
821           }
822           bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
823           continue;
824         }
825
826         //  update permanent page area in btree from buffer pool
827
828         page = (BtPage)(((uid)slot << bt->mgr->page_bits) + bt->mgr->pagepool);
829
830         if( latch->dirty )
831           if( bt->err = bt_writepage (bt->mgr, page, latch->page_no) )
832                 return NULL;
833           else
834                 latch->dirty = 0, bt->writes++;
835
836         //  unlink our available slot from its hash chain
837
838         if( latch->prev )
839                 bt->mgr->latchsets[latch->prev].next = latch->next;
840         else
841                 bt->mgr->hashtable[idx].slot = latch->next;
842
843         if( latch->next )
844                 bt->mgr->latchsets[latch->next].prev = latch->prev;
845
846         bt_spinreleasewrite (bt->mgr->hashtable[idx].latch);
847
848         if( bt_latchlink (bt, hashidx, slot, page_no, loadit) )
849                 return NULL;
850
851         bt_spinreleasewrite (bt->mgr->hashtable[hashidx].latch);
852         return latch;
853   }
854 }
855
856 void bt_mgrclose (BtMgr *mgr)
857 {
858 BtLatchSet *latch;
859 uint num = 0;
860 BtPage page;
861 uint slot;
862
863         //      flush dirty pool pages to the btree
864
865         for( slot = 1; slot <= mgr->latchdeployed; slot++ ) {
866                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
867                 latch = mgr->latchsets + slot;
868
869                 if( latch->dirty ) {
870                         bt_writepage(mgr, page, latch->page_no);
871                         latch->dirty = 0, num++;
872                 }
873 //              madvise (page, mgr->page_size, MADV_DONTNEED);
874         }
875
876         fprintf(stderr, "%d buffer pool pages flushed\n", num);
877
878 #ifdef unix
879         munmap (mgr->hashtable, (uid)mgr->nlatchpage << mgr->page_bits);
880         munmap (mgr->pagezero, mgr->page_size);
881 #else
882         FlushViewOfFile(mgr->pagezero, 0);
883         UnmapViewOfFile(mgr->pagezero);
884         UnmapViewOfFile(mgr->hashtable);
885         CloseHandle(mgr->halloc);
886         CloseHandle(mgr->hpool);
887 #endif
888 #ifdef unix
889         close (mgr->idx);
890         free (mgr);
891 #else
892         FlushFileBuffers(mgr->idx);
893         CloseHandle(mgr->idx);
894         GlobalFree (mgr);
895 #endif
896 }
897
898 //      close and release memory
899
900 void bt_close (BtDb *bt)
901 {
902 #ifdef unix
903         if( bt->mem )
904                 free (bt->mem);
905 #else
906         if( bt->mem)
907                 VirtualFree (bt->mem, 0, MEM_RELEASE);
908 #endif
909         free (bt);
910 }
911
912 //  open/create new btree buffer manager
913
914 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
915 //              size of page pool (e.g. 262144)
916
917 BtMgr *bt_mgr (char *name, uint bits, uint nodemax)
918 {
919 uint lvl, attr, last, slot, idx;
920 uint nlatchpage, latchhash;
921 unsigned char value[BtId];
922 int flag, initit = 0;
923 BtPageZero *pagezero;
924 off64_t size;
925 uint amt[1];
926 BtMgr* mgr;
927 BtKey* key;
928 BtVal *val;
929
930         // determine sanity of page size and buffer pool
931
932         if( bits > BT_maxbits )
933                 bits = BT_maxbits;
934         else if( bits < BT_minbits )
935                 bits = BT_minbits;
936
937         if( nodemax < 16 ) {
938                 fprintf(stderr, "Buffer pool too small: %d\n", nodemax);
939                 return NULL;
940         }
941
942 #ifdef unix
943         mgr = calloc (1, sizeof(BtMgr));
944
945         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
946
947         if( mgr->idx == -1 ) {
948                 fprintf (stderr, "Unable to open btree file\n");
949                 return free(mgr), NULL;
950         }
951 #else
952         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
953         attr = FILE_ATTRIBUTE_NORMAL;
954         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
955
956         if( mgr->idx == INVALID_HANDLE_VALUE )
957                 return GlobalFree(mgr), NULL;
958 #endif
959
960 #ifdef unix
961         pagezero = valloc (BT_maxpage);
962         *amt = 0;
963
964         // read minimum page size to get root info
965         //      to support raw disk partition files
966         //      check if bits == 0 on the disk.
967
968         if( size = lseek (mgr->idx, 0L, 2) )
969                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
970                         if( pagezero->alloc->bits )
971                                 bits = pagezero->alloc->bits;
972                         else
973                                 initit = 1;
974                 else
975                         return free(mgr), free(pagezero), NULL;
976         else
977                 initit = 1;
978 #else
979         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
980         size = GetFileSize(mgr->idx, amt);
981
982         if( size || *amt ) {
983                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
984                         return bt_mgrclose (mgr), NULL;
985                 bits = pagezero->alloc->bits;
986         } else
987                 initit = 1;
988 #endif
989
990         mgr->page_size = 1 << bits;
991         mgr->page_bits = bits;
992
993         //  calculate number of latch hash table entries
994
995         mgr->nlatchpage = (nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) / mgr->page_size;
996         mgr->latchhash = ((uid)mgr->nlatchpage << mgr->page_bits) / sizeof(BtHashEntry);
997
998         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
999         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1)/mgr->page_size;
1000         mgr->latchtotal = nodemax;
1001
1002         if( !initit )
1003                 goto mgrlatch;
1004
1005         // initialize an empty b-tree with latch page, root page, page of leaves
1006         // and page(s) of latches and page pool cache
1007
1008         memset (pagezero, 0, 1 << bits);
1009         pagezero->alloc->bits = mgr->page_bits;
1010         bt_putid(pagezero->alloc->right, MIN_lvl+1);
1011
1012         //  initialize left-most LEAF page in
1013         //      alloc->left.
1014
1015         bt_putid (pagezero->alloc->left, LEAF_page);
1016
1017         if( bt_writepage (mgr, pagezero->alloc, 0) ) {
1018                 fprintf (stderr, "Unable to create btree page zero\n");
1019                 return bt_mgrclose (mgr), NULL;
1020         }
1021
1022         memset (pagezero, 0, 1 << bits);
1023         pagezero->alloc->bits = mgr->page_bits;
1024
1025         for( lvl=MIN_lvl; lvl--; ) {
1026                 slotptr(pagezero->alloc, 1)->off = mgr->page_size - 3 - (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1027                 key = keyptr(pagezero->alloc, 1);
1028                 key->len = 2;           // create stopper key
1029                 key->key[0] = 0xff;
1030                 key->key[1] = 0xff;
1031
1032                 bt_putid(value, MIN_lvl - lvl + 1);
1033                 val = valptr(pagezero->alloc, 1);
1034                 val->len = lvl ? BtId : 0;
1035                 memcpy (val->value, value, val->len);
1036
1037                 pagezero->alloc->min = slotptr(pagezero->alloc, 1)->off;
1038                 pagezero->alloc->lvl = lvl;
1039                 pagezero->alloc->cnt = 1;
1040                 pagezero->alloc->act = 1;
1041
1042                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl) ) {
1043                         fprintf (stderr, "Unable to create btree page zero\n");
1044                         return bt_mgrclose (mgr), NULL;
1045                 }
1046         }
1047
1048 mgrlatch:
1049 #ifdef unix
1050         free (pagezero);
1051 #else
1052         VirtualFree (pagezero, 0, MEM_RELEASE);
1053 #endif
1054 #ifdef unix
1055         // mlock the pagezero page
1056
1057         flag = PROT_READ | PROT_WRITE;
1058         mgr->pagezero = mmap (0, mgr->page_size, flag, MAP_SHARED, mgr->idx, ALLOC_page << mgr->page_bits);
1059         if( mgr->pagezero == MAP_FAILED ) {
1060                 fprintf (stderr, "Unable to mmap btree page zero, error = %d\n", errno);
1061                 return bt_mgrclose (mgr), NULL;
1062         }
1063         mlock (mgr->pagezero, mgr->page_size);
1064
1065         mgr->hashtable = (void *)mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1066         if( mgr->hashtable == MAP_FAILED ) {
1067                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1068                 return bt_mgrclose (mgr), NULL;
1069         }
1070 #else
1071         flag = PAGE_READWRITE;
1072         mgr->halloc = CreateFileMapping(mgr->idx, NULL, flag, 0, mgr->page_size, NULL);
1073         if( !mgr->halloc ) {
1074                 fprintf (stderr, "Unable to create page zero memory mapping, error = %d\n", GetLastError());
1075                 return bt_mgrclose (mgr), NULL;
1076         }
1077
1078         flag = FILE_MAP_WRITE;
1079         mgr->pagezero = MapViewOfFile(mgr->halloc, flag, 0, 0, mgr->page_size);
1080         if( !mgr->pagezero ) {
1081                 fprintf (stderr, "Unable to map page zero, error = %d\n", GetLastError());
1082                 return bt_mgrclose (mgr), NULL;
1083         }
1084
1085         flag = PAGE_READWRITE;
1086         size = (uid)mgr->nlatchpage << mgr->page_bits;
1087         mgr->hpool = CreateFileMapping(INVALID_HANDLE_VALUE, NULL, flag, size >> 32, size, NULL);
1088         if( !mgr->hpool ) {
1089                 fprintf (stderr, "Unable to create buffer pool memory mapping, error = %d\n", GetLastError());
1090                 return bt_mgrclose (mgr), NULL;
1091         }
1092
1093         flag = FILE_MAP_WRITE;
1094         mgr->hashtable = MapViewOfFile(mgr->pool, flag, 0, 0, size);
1095         if( !mgr->hashtable ) {
1096                 fprintf (stderr, "Unable to map buffer pool, error = %d\n", GetLastError());
1097                 return bt_mgrclose (mgr), NULL;
1098         }
1099 #endif
1100
1101         mgr->pagepool = (unsigned char *)mgr->hashtable + ((uid)(mgr->nlatchpage - mgr->latchtotal) << mgr->page_bits);
1102         mgr->latchsets = (BtLatchSet *)(mgr->pagepool - (uid)mgr->latchtotal * sizeof(BtLatchSet));
1103
1104         return mgr;
1105 }
1106
1107 //      open BTree access method
1108 //      based on buffer manager
1109
1110 BtDb *bt_open (BtMgr *mgr)
1111 {
1112 BtDb *bt = malloc (sizeof(*bt));
1113
1114         memset (bt, 0, sizeof(*bt));
1115         bt->mgr = mgr;
1116 #ifdef unix
1117         bt->mem = valloc (2 *mgr->page_size);
1118 #else
1119         bt->mem = VirtualAlloc(NULL, 2 * mgr->page_size, MEM_COMMIT, PAGE_READWRITE);
1120 #endif
1121         bt->frame = (BtPage)bt->mem;
1122         bt->cursor = (BtPage)(bt->mem + 1 * mgr->page_size);
1123 #ifdef unix
1124         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1125 #else
1126         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1127 #endif
1128         return bt;
1129 }
1130
1131 //  compare two keys, return > 0, = 0, or < 0
1132 //  =0: keys are same
1133 //  -1: key2 > key1
1134 //  +1: key2 < key1
1135 //  as the comparison value
1136
1137 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1138 {
1139 uint len1 = key1->len;
1140 int ans;
1141
1142         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1143                 return ans;
1144
1145         if( len1 > len2 )
1146                 return 1;
1147         if( len1 < len2 )
1148                 return -1;
1149
1150         return 0;
1151 }
1152
1153 // place write, read, or parent lock on requested page_no.
1154
1155 void bt_lockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1156 {
1157         switch( mode ) {
1158         case BtLockRead:
1159                 ReadLock (latch->readwr);
1160                 break;
1161         case BtLockWrite:
1162                 WriteLock (latch->readwr);
1163                 break;
1164         case BtLockAccess:
1165                 ReadLock (latch->access);
1166                 break;
1167         case BtLockDelete:
1168                 WriteLock (latch->access);
1169                 break;
1170         case BtLockParent:
1171                 WriteOLock (latch->parent);
1172                 break;
1173         case BtLockAtomic:
1174                 WriteOLock (latch->atomic);
1175                 break;
1176         }
1177 }
1178
1179 // remove write, read, or parent lock on requested page
1180
1181 void bt_unlockpage(BtDb *bt, BtLock mode, BtLatchSet *latch)
1182 {
1183         switch( mode ) {
1184         case BtLockRead:
1185                 ReadRelease (latch->readwr);
1186                 break;
1187         case BtLockWrite:
1188                 WriteRelease (latch->readwr);
1189                 break;
1190         case BtLockAccess:
1191                 ReadRelease (latch->access);
1192                 break;
1193         case BtLockDelete:
1194                 WriteRelease (latch->access);
1195                 break;
1196         case BtLockParent:
1197                 WriteORelease (latch->parent);
1198                 break;
1199         case BtLockAtomic:
1200                 WriteORelease (latch->atomic);
1201                 break;
1202         }
1203 }
1204
1205 //      allocate a new page
1206 //      return with page latched, but unlocked.
1207
1208 int bt_newpage(BtDb *bt, BtPageSet *set, BtPage contents)
1209 {
1210 uid page_no;
1211 int blk;
1212
1213         //      lock allocation page
1214
1215         bt_spinwritelock(bt->mgr->lock);
1216
1217         // use empty chain first
1218         // else allocate empty page
1219
1220         if( page_no = bt_getid(bt->mgr->pagezero->chain) ) {
1221                 if( set->latch = bt_pinlatch (bt, page_no, 1) )
1222                         set->page = bt_mappage (bt, set->latch);
1223                 else
1224                         return bt->err = BTERR_struct, -1;
1225
1226                 bt_putid(bt->mgr->pagezero->chain, bt_getid(set->page->right));
1227                 bt_spinreleasewrite(bt->mgr->lock);
1228
1229                 memcpy (set->page, contents, bt->mgr->page_size);
1230                 set->latch->dirty = 1;
1231                 return 0;
1232         }
1233
1234         page_no = bt_getid(bt->mgr->pagezero->alloc->right);
1235         bt_putid(bt->mgr->pagezero->alloc->right, page_no+1);
1236
1237         // unlock allocation latch
1238
1239         bt_spinreleasewrite(bt->mgr->lock);
1240
1241         //      don't load cache from btree page
1242
1243         if( set->latch = bt_pinlatch (bt, page_no, 0) )
1244                 set->page = bt_mappage (bt, set->latch);
1245         else
1246                 return bt->err = BTERR_struct;
1247
1248         memcpy (set->page, contents, bt->mgr->page_size);
1249         set->latch->dirty = 1;
1250         return 0;
1251 }
1252
1253 //  find slot in page for given key at a given level
1254
1255 int bt_findslot (BtPage page, unsigned char *key, uint len)
1256 {
1257 uint diff, higher = page->cnt, low = 1, slot;
1258 uint good = 0;
1259
1260         //        make stopper key an infinite fence value
1261
1262         if( bt_getid (page->right) )
1263                 higher++;
1264         else
1265                 good++;
1266
1267         //      low is the lowest candidate.
1268         //  loop ends when they meet
1269
1270         //  higher is already
1271         //      tested as .ge. the passed key.
1272
1273         while( diff = higher - low ) {
1274                 slot = low + ( diff >> 1 );
1275                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1276                         low = slot + 1;
1277                 else
1278                         higher = slot, good++;
1279         }
1280
1281         //      return zero if key is on right link page
1282
1283         return good ? higher : 0;
1284 }
1285
1286 //  find and load page at given level for given key
1287 //      leave page rd or wr locked as requested
1288
1289 int bt_loadpage (BtDb *bt, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock)
1290 {
1291 uid page_no = ROOT_page, prevpage = 0;
1292 uint drill = 0xff, slot;
1293 BtLatchSet *prevlatch;
1294 uint mode, prevmode;
1295
1296   //  start at root of btree and drill down
1297
1298   do {
1299         // determine lock mode of drill level
1300         mode = (drill == lvl) ? lock : BtLockRead; 
1301
1302         if( !(set->latch = bt_pinlatch (bt, page_no, 1)) )
1303           return 0;
1304
1305         // obtain access lock using lock chaining with Access mode
1306
1307         if( page_no > ROOT_page )
1308           bt_lockpage(bt, BtLockAccess, set->latch);
1309
1310         set->page = bt_mappage (bt, set->latch);
1311
1312         //      release & unpin parent or left sibling page
1313
1314         if( prevpage ) {
1315           bt_unlockpage(bt, prevmode, prevlatch);
1316           bt_unpinlatch (prevlatch);
1317           prevpage = 0;
1318         }
1319
1320         // obtain mode lock using lock chaining through AccessLock
1321
1322         bt_lockpage(bt, mode, set->latch);
1323
1324         if( set->page->free )
1325                 return bt->err = BTERR_struct, 0;
1326
1327         if( page_no > ROOT_page )
1328           bt_unlockpage(bt, BtLockAccess, set->latch);
1329
1330         // re-read and re-lock root after determining actual level of root
1331
1332         if( set->page->lvl != drill) {
1333                 if( set->latch->page_no != ROOT_page )
1334                         return bt->err = BTERR_struct, 0;
1335                         
1336                 drill = set->page->lvl;
1337
1338                 if( lock != BtLockRead && drill == lvl ) {
1339                   bt_unlockpage(bt, mode, set->latch);
1340                   bt_unpinlatch (set->latch);
1341                   continue;
1342                 }
1343         }
1344
1345         prevpage = set->latch->page_no;
1346         prevlatch = set->latch;
1347         prevmode = mode;
1348
1349         //  find key on page at this level
1350         //  and descend to requested level
1351
1352         if( !set->page->kill )
1353          if( slot = bt_findslot (set->page, key, len) ) {
1354           if( drill == lvl )
1355                 return slot;
1356
1357           // find next non-dead slot -- the fence key if nothing else
1358
1359           while( slotptr(set->page, slot)->dead )
1360                 if( slot++ < set->page->cnt )
1361                   continue;
1362                 else
1363                   return bt->err = BTERR_struct, 0;
1364
1365           page_no = bt_getid(valptr(set->page, slot)->value);
1366           drill--;
1367           continue;
1368          }
1369
1370         //  or slide right into next page
1371
1372         page_no = bt_getid(set->page->right);
1373   } while( page_no );
1374
1375   // return error on end of right chain
1376
1377   bt->err = BTERR_struct;
1378   return 0;     // return error
1379 }
1380
1381 //      return page to free list
1382 //      page must be delete & write locked
1383
1384 void bt_freepage (BtDb *bt, BtPageSet *set)
1385 {
1386         //      lock allocation page
1387
1388         bt_spinwritelock (bt->mgr->lock);
1389
1390         //      store chain
1391
1392         memcpy(set->page->right, bt->mgr->pagezero->chain, BtId);
1393         bt_putid(bt->mgr->pagezero->chain, set->latch->page_no);
1394         set->latch->dirty = 1;
1395         set->page->free = 1;
1396
1397         // unlock released page
1398
1399         bt_unlockpage (bt, BtLockDelete, set->latch);
1400         bt_unlockpage (bt, BtLockWrite, set->latch);
1401         bt_unpinlatch (set->latch);
1402
1403         // unlock allocation page
1404
1405         bt_spinreleasewrite (bt->mgr->lock);
1406 }
1407
1408 //      a fence key was deleted from a page
1409 //      push new fence value upwards
1410
1411 BTERR bt_fixfence (BtDb *bt, BtPageSet *set, uint lvl)
1412 {
1413 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1414 unsigned char value[BtId];
1415 BtKey* ptr;
1416 uint idx;
1417
1418         //      remove the old fence value
1419
1420         ptr = keyptr(set->page, set->page->cnt);
1421         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1422         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1423         set->latch->dirty = 1;
1424
1425         //  cache new fence value
1426
1427         ptr = keyptr(set->page, set->page->cnt);
1428         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1429
1430         bt_lockpage (bt, BtLockParent, set->latch);
1431         bt_unlockpage (bt, BtLockWrite, set->latch);
1432
1433         //      insert new (now smaller) fence key
1434
1435         bt_putid (value, set->latch->page_no);
1436         ptr = (BtKey*)leftkey;
1437
1438         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1439           return bt->err;
1440
1441         //      now delete old fence key
1442
1443         ptr = (BtKey*)rightkey;
1444
1445         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1446                 return bt->err;
1447
1448         bt_unlockpage (bt, BtLockParent, set->latch);
1449         bt_unpinlatch(set->latch);
1450         return 0;
1451 }
1452
1453 //      root has a single child
1454 //      collapse a level from the tree
1455
1456 BTERR bt_collapseroot (BtDb *bt, BtPageSet *root)
1457 {
1458 BtPageSet child[1];
1459 uid page_no;
1460 uint idx;
1461
1462   // find the child entry and promote as new root contents
1463
1464   do {
1465         for( idx = 0; idx++ < root->page->cnt; )
1466           if( !slotptr(root->page, idx)->dead )
1467                 break;
1468
1469         page_no = bt_getid (valptr(root->page, idx)->value);
1470
1471         if( child->latch = bt_pinlatch (bt, page_no, 1) )
1472                 child->page = bt_mappage (bt, child->latch);
1473         else
1474                 return bt->err;
1475
1476         bt_lockpage (bt, BtLockDelete, child->latch);
1477         bt_lockpage (bt, BtLockWrite, child->latch);
1478
1479         memcpy (root->page, child->page, bt->mgr->page_size);
1480         root->latch->dirty = 1;
1481
1482         bt_freepage (bt, child);
1483
1484   } while( root->page->lvl > 1 && root->page->act == 1 );
1485
1486   bt_unlockpage (bt, BtLockWrite, root->latch);
1487   bt_unpinlatch (root->latch);
1488   return 0;
1489 }
1490
1491 //  delete a page and manage keys
1492 //  call with page writelocked
1493 //      returns with page unpinned
1494
1495 BTERR bt_deletepage (BtDb *bt, BtPageSet *set)
1496 {
1497 unsigned char lowerfence[BT_keyarray], higherfence[BT_keyarray];
1498 unsigned char value[BtId];
1499 uint lvl = set->page->lvl;
1500 BtPageSet right[1];
1501 uid page_no;
1502 BtKey *ptr;
1503
1504         //      cache copy of fence key
1505         //      to post in parent
1506
1507         ptr = keyptr(set->page, set->page->cnt);
1508         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1509
1510         //      obtain lock on right page
1511
1512         page_no = bt_getid(set->page->right);
1513
1514         if( right->latch = bt_pinlatch (bt, page_no, 1) )
1515                 right->page = bt_mappage (bt, right->latch);
1516         else
1517                 return 0;
1518
1519         bt_lockpage (bt, BtLockWrite, right->latch);
1520
1521         // cache copy of key to update
1522
1523         ptr = keyptr(right->page, right->page->cnt);
1524         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1525
1526         if( right->page->kill )
1527                 return bt->err = BTERR_struct;
1528
1529         // pull contents of right peer into our empty page
1530
1531         memcpy (set->page, right->page, bt->mgr->page_size);
1532         set->latch->dirty = 1;
1533
1534         // mark right page deleted and point it to left page
1535         //      until we can post parent updates that remove access
1536         //      to the deleted page.
1537
1538         bt_putid (right->page->right, set->latch->page_no);
1539         right->latch->dirty = 1;
1540         right->page->kill = 1;
1541
1542         bt_lockpage (bt, BtLockParent, right->latch);
1543         bt_unlockpage (bt, BtLockWrite, right->latch);
1544
1545         bt_lockpage (bt, BtLockParent, set->latch);
1546         bt_unlockpage (bt, BtLockWrite, set->latch);
1547
1548         // redirect higher key directly to our new node contents
1549
1550         bt_putid (value, set->latch->page_no);
1551         ptr = (BtKey*)higherfence;
1552
1553         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
1554           return bt->err;
1555
1556         //      delete old lower key to our node
1557
1558         ptr = (BtKey*)lowerfence;
1559
1560         if( bt_deletekey (bt, ptr->key, ptr->len, lvl+1) )
1561           return bt->err;
1562
1563         //      obtain delete and write locks to right node
1564
1565         bt_unlockpage (bt, BtLockParent, right->latch);
1566         bt_lockpage (bt, BtLockDelete, right->latch);
1567         bt_lockpage (bt, BtLockWrite, right->latch);
1568         bt_freepage (bt, right);
1569
1570         bt_unlockpage (bt, BtLockParent, set->latch);
1571         bt_unpinlatch (set->latch);
1572         bt->found = 1;
1573         return 0;
1574 }
1575
1576 //  find and delete key on page by marking delete flag bit
1577 //  if page becomes empty, delete it from the btree
1578
1579 BTERR bt_deletekey (BtDb *bt, unsigned char *key, uint len, uint lvl)
1580 {
1581 uint slot, idx, found, fence;
1582 BtPageSet set[1];
1583 BtKey *ptr;
1584 BtVal *val;
1585
1586         if( slot = bt_loadpage (bt, set, key, len, lvl, BtLockWrite) )
1587                 ptr = keyptr(set->page, slot);
1588         else
1589                 return bt->err;
1590
1591         // if librarian slot, advance to real slot
1592
1593         if( slotptr(set->page, slot)->type == Librarian )
1594                 ptr = keyptr(set->page, ++slot);
1595
1596         fence = slot == set->page->cnt;
1597
1598         // if key is found delete it, otherwise ignore request
1599
1600         if( found = !keycmp (ptr, key, len) )
1601           if( found = slotptr(set->page, slot)->dead == 0 ) {
1602                 val = valptr(set->page,slot);
1603                 slotptr(set->page, slot)->dead = 1;
1604                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1605                 set->page->act--;
1606
1607                 // collapse empty slots beneath the fence
1608
1609                 while( idx = set->page->cnt - 1 )
1610                   if( slotptr(set->page, idx)->dead ) {
1611                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1612                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1613                   } else
1614                         break;
1615           }
1616
1617         //      did we delete a fence key in an upper level?
1618
1619         if( found && lvl && set->page->act && fence )
1620           if( bt_fixfence (bt, set, lvl) )
1621                 return bt->err;
1622           else
1623                 return bt->found = found, 0;
1624
1625         //      do we need to collapse root?
1626
1627         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1628           if( bt_collapseroot (bt, set) )
1629                 return bt->err;
1630           else
1631                 return bt->found = found, 0;
1632
1633         //      delete empty page
1634
1635         if( !set->page->act )
1636                 return bt_deletepage (bt, set);
1637
1638         set->latch->dirty = 1;
1639         bt_unlockpage(bt, BtLockWrite, set->latch);
1640         bt_unpinlatch (set->latch);
1641         return bt->found = found, 0;
1642 }
1643
1644 BtKey *bt_foundkey (BtDb *bt)
1645 {
1646         return (BtKey*)(bt->key);
1647 }
1648
1649 //      advance to next slot
1650
1651 uint bt_findnext (BtDb *bt, BtPageSet *set, uint slot)
1652 {
1653 BtLatchSet *prevlatch;
1654 uid page_no;
1655
1656         if( slot < set->page->cnt )
1657                 return slot + 1;
1658
1659         prevlatch = set->latch;
1660
1661         if( page_no = bt_getid(set->page->right) )
1662           if( set->latch = bt_pinlatch (bt, page_no, 1) )
1663                 set->page = bt_mappage (bt, set->latch);
1664           else
1665                 return 0;
1666         else
1667           return bt->err = BTERR_struct, 0;
1668
1669         // obtain access lock using lock chaining with Access mode
1670
1671         bt_lockpage(bt, BtLockAccess, set->latch);
1672
1673         bt_unlockpage(bt, BtLockRead, prevlatch);
1674         bt_unpinlatch (prevlatch);
1675
1676         bt_lockpage(bt, BtLockRead, set->latch);
1677         bt_unlockpage(bt, BtLockAccess, set->latch);
1678         return 1;
1679 }
1680
1681 //      find unique key or first duplicate key in
1682 //      leaf level and return number of value bytes
1683 //      or (-1) if not found.  Setup key for bt_foundkey
1684
1685 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
1686 {
1687 BtPageSet set[1];
1688 uint len, slot;
1689 int ret = -1;
1690 BtKey *ptr;
1691 BtVal *val;
1692
1693   if( slot = bt_loadpage (bt, set, key, keylen, 0, BtLockRead) )
1694    do {
1695         ptr = keyptr(set->page, slot);
1696
1697         //      skip librarian slot place holder
1698
1699         if( slotptr(set->page, slot)->type == Librarian )
1700                 ptr = keyptr(set->page, ++slot);
1701
1702         //      return actual key found
1703
1704         memcpy (bt->key, ptr, ptr->len + sizeof(BtKey));
1705         len = ptr->len;
1706
1707         if( slotptr(set->page, slot)->type == Duplicate )
1708                 len -= BtId;
1709
1710         //      not there if we reach the stopper key
1711
1712         if( slot == set->page->cnt )
1713           if( !bt_getid (set->page->right) )
1714                 break;
1715
1716         // if key exists, return >= 0 value bytes copied
1717         //      otherwise return (-1)
1718
1719         if( slotptr(set->page, slot)->dead )
1720                 continue;
1721
1722         if( keylen == len )
1723           if( !memcmp (ptr->key, key, len) ) {
1724                 val = valptr (set->page,slot);
1725                 if( valmax > val->len )
1726                         valmax = val->len;
1727                 memcpy (value, val->value, valmax);
1728                 ret = valmax;
1729           }
1730
1731         break;
1732
1733    } while( slot = bt_findnext (bt, set, slot) );
1734
1735   bt_unlockpage (bt, BtLockRead, set->latch);
1736   bt_unpinlatch (set->latch);
1737   return ret;
1738 }
1739
1740 //      check page for space available,
1741 //      clean if necessary and return
1742 //      0 - page needs splitting
1743 //      >0  new slot value
1744
1745 uint bt_cleanpage(BtDb *bt, BtPageSet *set, uint keylen, uint slot, uint vallen)
1746 {
1747 uint nxt = bt->mgr->page_size;
1748 BtPage page = set->page;
1749 uint cnt = 0, idx = 0;
1750 uint max = page->cnt;
1751 uint newslot = max;
1752 BtKey *key;
1753 BtVal *val;
1754
1755         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1756                 return slot;
1757
1758         //      skip cleanup and proceed to split
1759         //      if there's not enough garbage
1760         //      to bother with.
1761
1762         if( page->garbage < nxt / 5 )
1763                 return 0;
1764
1765         memcpy (bt->frame, page, bt->mgr->page_size);
1766
1767         // skip page info and set rest of page to zero
1768
1769         memset (page+1, 0, bt->mgr->page_size - sizeof(*page));
1770         set->latch->dirty = 1;
1771         page->garbage = 0;
1772         page->act = 0;
1773
1774         // clean up page first by
1775         // removing deleted keys
1776
1777         while( cnt++ < max ) {
1778                 if( cnt == slot )
1779                         newslot = idx + 2;
1780
1781                 if( cnt < max || bt->frame->lvl )
1782                   if( slotptr(bt->frame,cnt)->dead )
1783                         continue;
1784
1785                 // copy the value across
1786
1787                 val = valptr(bt->frame, cnt);
1788                 nxt -= val->len + sizeof(BtVal);
1789                 memcpy ((unsigned char *)page + nxt, val, val->len + sizeof(BtVal));
1790
1791                 // copy the key across
1792
1793                 key = keyptr(bt->frame, cnt);
1794                 nxt -= key->len + sizeof(BtKey);
1795                 memcpy ((unsigned char *)page + nxt, key, key->len + sizeof(BtKey));
1796
1797                 // make a librarian slot
1798
1799                 slotptr(page, ++idx)->off = nxt;
1800                 slotptr(page, idx)->type = Librarian;
1801                 slotptr(page, idx)->dead = 1;
1802
1803                 // set up the slot
1804
1805                 slotptr(page, ++idx)->off = nxt;
1806                 slotptr(page, idx)->type = slotptr(bt->frame, cnt)->type;
1807
1808                 if( !(slotptr(page, idx)->dead = slotptr(bt->frame, cnt)->dead) )
1809                         page->act++;
1810         }
1811
1812         page->min = nxt;
1813         page->cnt = idx;
1814
1815         //      see if page has enough space now, or does it need splitting?
1816
1817         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1818                 return newslot;
1819
1820         return 0;
1821 }
1822
1823 // split the root and raise the height of the btree
1824
1825 BTERR bt_splitroot(BtDb *bt, BtPageSet *root, BtLatchSet *right)
1826 {  
1827 unsigned char leftkey[BT_keyarray];
1828 uint nxt = bt->mgr->page_size;
1829 unsigned char value[BtId];
1830 BtPageSet left[1];
1831 uid left_page_no;
1832 BtKey *ptr;
1833 BtVal *val;
1834
1835         //      save left page fence key for new root
1836
1837         ptr = keyptr(root->page, root->page->cnt);
1838         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1839
1840         //  Obtain an empty page to use, and copy the current
1841         //  root contents into it, e.g. lower keys
1842
1843         if( bt_newpage(bt, left, root->page) )
1844                 return bt->err;
1845
1846         left_page_no = left->latch->page_no;
1847         bt_unpinlatch (left->latch);
1848
1849         // preserve the page info at the bottom
1850         // of higher keys and set rest to zero
1851
1852         memset(root->page+1, 0, bt->mgr->page_size - sizeof(*root->page));
1853
1854         // insert stopper key at top of newroot page
1855         // and increase the root height
1856
1857         nxt -= BtId + sizeof(BtVal);
1858         bt_putid (value, right->page_no);
1859         val = (BtVal *)((unsigned char *)root->page + nxt);
1860         memcpy (val->value, value, BtId);
1861         val->len = BtId;
1862
1863         nxt -= 2 + sizeof(BtKey);
1864         slotptr(root->page, 2)->off = nxt;
1865         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1866         ptr->len = 2;
1867         ptr->key[0] = 0xff;
1868         ptr->key[1] = 0xff;
1869
1870         // insert lower keys page fence key on newroot page as first key
1871
1872         nxt -= BtId + sizeof(BtVal);
1873         bt_putid (value, left_page_no);
1874         val = (BtVal *)((unsigned char *)root->page + nxt);
1875         memcpy (val->value, value, BtId);
1876         val->len = BtId;
1877
1878         ptr = (BtKey *)leftkey;
1879         nxt -= ptr->len + sizeof(BtKey);
1880         slotptr(root->page, 1)->off = nxt;
1881         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1882         
1883         bt_putid(root->page->right, 0);
1884         root->page->min = nxt;          // reset lowest used offset and key count
1885         root->page->cnt = 2;
1886         root->page->act = 2;
1887         root->page->lvl++;
1888
1889         // release and unpin root pages
1890
1891         bt_unlockpage(bt, BtLockWrite, root->latch);
1892         bt_unpinlatch (root->latch);
1893
1894         bt_unpinlatch (right);
1895         return 0;
1896 }
1897
1898 //  split already locked full node
1899 //      leave it locked.
1900 //      return pool entry for new right
1901 //      page, unlocked
1902
1903 uint bt_splitpage (BtDb *bt, BtPageSet *set)
1904 {
1905 uint cnt = 0, idx = 0, max, nxt = bt->mgr->page_size;
1906 uint lvl = set->page->lvl;
1907 BtPageSet right[1];
1908 BtKey *key, *ptr;
1909 BtVal *val, *src;
1910 uid right2;
1911 uint prev;
1912
1913         //  split higher half of keys to bt->frame
1914
1915         memset (bt->frame, 0, bt->mgr->page_size);
1916         max = set->page->cnt;
1917         cnt = max / 2;
1918         idx = 0;
1919
1920         while( cnt++ < max ) {
1921                 if( cnt < max || set->page->lvl )
1922                   if( slotptr(set->page, cnt)->dead )
1923                         continue;
1924
1925                 src = valptr(set->page, cnt);
1926                 nxt -= src->len + sizeof(BtVal);
1927                 memcpy ((unsigned char *)bt->frame + nxt, src, src->len + sizeof(BtVal));
1928
1929                 key = keyptr(set->page, cnt);
1930                 nxt -= key->len + sizeof(BtKey);
1931                 ptr = (BtKey*)((unsigned char *)bt->frame + nxt);
1932                 memcpy (ptr, key, key->len + sizeof(BtKey));
1933
1934                 //      add librarian slot
1935
1936                 slotptr(bt->frame, ++idx)->off = nxt;
1937                 slotptr(bt->frame, idx)->type = Librarian;
1938                 slotptr(bt->frame, idx)->dead = 1;
1939
1940                 //  add actual slot
1941
1942                 slotptr(bt->frame, ++idx)->off = nxt;
1943                 slotptr(bt->frame, idx)->type = slotptr(set->page, cnt)->type;
1944
1945                 if( !(slotptr(bt->frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1946                         bt->frame->act++;
1947         }
1948
1949         bt->frame->bits = bt->mgr->page_bits;
1950         bt->frame->min = nxt;
1951         bt->frame->cnt = idx;
1952         bt->frame->lvl = lvl;
1953
1954         // link right node
1955
1956         if( set->latch->page_no > ROOT_page )
1957                 bt_putid (bt->frame->right, bt_getid (set->page->right));
1958
1959         //      get new free page and write higher keys to it.
1960
1961         if( bt_newpage(bt, right, bt->frame) )
1962                 return 0;
1963
1964         memcpy (bt->frame, set->page, bt->mgr->page_size);
1965         memset (set->page+1, 0, bt->mgr->page_size - sizeof(*set->page));
1966         set->latch->dirty = 1;
1967
1968         nxt = bt->mgr->page_size;
1969         set->page->garbage = 0;
1970         set->page->act = 0;
1971         max /= 2;
1972         cnt = 0;
1973         idx = 0;
1974
1975         if( slotptr(bt->frame, max)->type == Librarian )
1976                 max--;
1977
1978         //  assemble page of smaller keys
1979
1980         while( cnt++ < max ) {
1981                 if( slotptr(bt->frame, cnt)->dead )
1982                         continue;
1983                 val = valptr(bt->frame, cnt);
1984                 nxt -= val->len + sizeof(BtVal);
1985                 memcpy ((unsigned char *)set->page + nxt, val, val->len + sizeof(BtVal));
1986
1987                 key = keyptr(bt->frame, cnt);
1988                 nxt -= key->len + sizeof(BtKey);
1989                 memcpy ((unsigned char *)set->page + nxt, key, key->len + sizeof(BtKey));
1990
1991                 //      add librarian slot
1992
1993                 if( idx ) {
1994                         slotptr(set->page, ++idx)->off = nxt;
1995                         slotptr(set->page, idx)->type = Librarian;
1996                         slotptr(set->page, idx)->dead = 1;
1997                 }
1998
1999                 //      add actual slot
2000
2001                 slotptr(set->page, ++idx)->off = nxt;
2002                 slotptr(set->page, idx)->type = slotptr(bt->frame, cnt)->type;
2003                 set->page->act++;
2004         }
2005
2006         bt_putid(set->page->right, right->latch->page_no);
2007         set->page->min = nxt;
2008         set->page->cnt = idx;
2009
2010         return right->latch->entry;
2011 }
2012
2013 //      fix keys for newly split page
2014 //      call with page locked, return
2015 //      unlocked
2016
2017 BTERR bt_splitkeys (BtDb *bt, BtPageSet *set, BtLatchSet *right)
2018 {
2019 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2020 unsigned char value[BtId];
2021 uint lvl = set->page->lvl;
2022 BtPage page;
2023 BtKey *ptr;
2024
2025         // if current page is the root page, split it
2026
2027         if( set->latch->page_no == ROOT_page )
2028                 return bt_splitroot (bt, set, right);
2029
2030         ptr = keyptr(set->page, set->page->cnt);
2031         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2032
2033         page = bt_mappage (bt, right);
2034
2035         ptr = keyptr(page, page->cnt);
2036         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2037
2038         // insert new fences in their parent pages
2039
2040         bt_lockpage (bt, BtLockParent, right);
2041
2042         bt_lockpage (bt, BtLockParent, set->latch);
2043         bt_unlockpage (bt, BtLockWrite, set->latch);
2044
2045         // insert new fence for reformulated left block of smaller keys
2046
2047         bt_putid (value, set->latch->page_no);
2048         ptr = (BtKey *)leftkey;
2049
2050         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2051                 return bt->err;
2052
2053         // switch fence for right block of larger keys to new right page
2054
2055         bt_putid (value, right->page_no);
2056         ptr = (BtKey *)rightkey;
2057
2058         if( bt_insertkey (bt, ptr->key, ptr->len, lvl+1, value, BtId, 1) )
2059                 return bt->err;
2060
2061         bt_unlockpage (bt, BtLockParent, set->latch);
2062         bt_unpinlatch (set->latch);
2063
2064         bt_unlockpage (bt, BtLockParent, right);
2065         bt_unpinlatch (right);
2066         return 0;
2067 }
2068
2069 //      install new key and value onto page
2070 //      page must already be checked for
2071 //      adequate space
2072
2073 BTERR bt_insertslot (BtDb *bt, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type, uint release)
2074 {
2075 uint idx, librarian;
2076 BtSlot *node;
2077 BtKey *ptr;
2078 BtVal *val;
2079
2080         //      if found slot > desired slot and previous slot
2081         //      is a librarian slot, use it
2082
2083         if( slot > 1 )
2084           if( slotptr(set->page, slot-1)->type == Librarian )
2085                 slot--;
2086
2087         // copy value onto page
2088
2089         set->page->min -= vallen + sizeof(BtVal);
2090         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2091         memcpy (val->value, value, vallen);
2092         val->len = vallen;
2093
2094         // copy key onto page
2095
2096         set->page->min -= keylen + sizeof(BtKey);
2097         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2098         memcpy (ptr->key, key, keylen);
2099         ptr->len = keylen;
2100         
2101         //      find first empty slot
2102
2103         for( idx = slot; idx < set->page->cnt; idx++ )
2104           if( slotptr(set->page, idx)->dead )
2105                 break;
2106
2107         // now insert key into array before slot
2108
2109         if( idx == set->page->cnt )
2110                 idx += 2, set->page->cnt += 2, librarian = 2;
2111         else
2112                 librarian = 1;
2113
2114         set->latch->dirty = 1;
2115         set->page->act++;
2116
2117         while( idx > slot + librarian - 1 )
2118                 *slotptr(set->page, idx) = *slotptr(set->page, idx - librarian), idx--;
2119
2120         //      add librarian slot
2121
2122         if( librarian > 1 ) {
2123                 node = slotptr(set->page, slot++);
2124                 node->off = set->page->min;
2125                 node->type = Librarian;
2126                 node->dead = 1;
2127         }
2128
2129         //      fill in new slot
2130
2131         node = slotptr(set->page, slot);
2132         node->off = set->page->min;
2133         node->type = type;
2134         node->dead = 0;
2135
2136         if( release ) {
2137                 bt_unlockpage (bt, BtLockWrite, set->latch);
2138                 bt_unpinlatch (set->latch);
2139         }
2140
2141         return 0;
2142 }
2143
2144 //  Insert new key into the btree at given level.
2145 //      either add a new key or update/add an existing one
2146
2147 BTERR bt_insertkey (BtDb *bt, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, uint unique)
2148 {
2149 unsigned char newkey[BT_keyarray];
2150 uint slot, idx, len, entry;
2151 BtPageSet set[1];
2152 BtKey *ptr, *ins;
2153 uid sequence;
2154 BtVal *val;
2155 uint type;
2156
2157   // set up the key we're working on
2158
2159   ins = (BtKey*)newkey;
2160   memcpy (ins->key, key, keylen);
2161   ins->len = keylen;
2162
2163   // is this a non-unique index value?
2164
2165   if( unique )
2166         type = Unique;
2167   else {
2168         type = Duplicate;
2169         sequence = bt_newdup (bt);
2170         bt_putid (ins->key + ins->len + sizeof(BtKey), sequence);
2171         ins->len += BtId;
2172   }
2173
2174   while( 1 ) { // find the page and slot for the current key
2175         if( slot = bt_loadpage (bt, set, ins->key, ins->len, lvl, BtLockWrite) )
2176                 ptr = keyptr(set->page, slot);
2177         else {
2178                 if( !bt->err )
2179                         bt->err = BTERR_ovflw;
2180                 return bt->err;
2181         }
2182
2183         // if librarian slot == found slot, advance to real slot
2184
2185         if( slotptr(set->page, slot)->type == Librarian )
2186           if( !keycmp (ptr, key, keylen) )
2187                 ptr = keyptr(set->page, ++slot);
2188
2189         len = ptr->len;
2190
2191         if( slotptr(set->page, slot)->type == Duplicate )
2192                 len -= BtId;
2193
2194         //  if inserting a duplicate key or unique key
2195         //      check for adequate space on the page
2196         //      and insert the new key before slot.
2197
2198         if( unique && (len != ins->len || memcmp (ptr->key, ins->key, ins->len)) || !unique ) {
2199           if( !(slot = bt_cleanpage (bt, set, ins->len, slot, vallen)) )
2200                 if( !(entry = bt_splitpage (bt, set)) )
2201                   return bt->err;
2202                 else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2203                   return bt->err;
2204                 else
2205                   continue;
2206
2207           return bt_insertslot (bt, set, slot, ins->key, ins->len, value, vallen, type, 1);
2208         }
2209
2210         // if key already exists, update value and return
2211
2212         val = valptr(set->page, slot);
2213
2214         if( val->len >= vallen ) {
2215                 if( slotptr(set->page, slot)->dead )
2216                         set->page->act++;
2217                 set->page->garbage += val->len - vallen;
2218                 set->latch->dirty = 1;
2219                 slotptr(set->page, slot)->dead = 0;
2220                 val->len = vallen;
2221                 memcpy (val->value, value, vallen);
2222                 bt_unlockpage(bt, BtLockWrite, set->latch);
2223                 bt_unpinlatch (set->latch);
2224                 return 0;
2225         }
2226
2227         //      new update value doesn't fit in existing value area
2228
2229         if( !slotptr(set->page, slot)->dead )
2230                 set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2231         else {
2232                 slotptr(set->page, slot)->dead = 0;
2233                 set->page->act++;
2234         }
2235
2236         if( !(slot = bt_cleanpage (bt, set, keylen, slot, vallen)) )
2237           if( !(entry = bt_splitpage (bt, set)) )
2238                 return bt->err;
2239           else if( bt_splitkeys (bt, set, bt->mgr->latchsets + entry) )
2240                 return bt->err;
2241           else
2242                 continue;
2243
2244         set->page->min -= vallen + sizeof(BtVal);
2245         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2246         memcpy (val->value, value, vallen);
2247         val->len = vallen;
2248
2249         set->latch->dirty = 1;
2250         set->page->min -= keylen + sizeof(BtKey);
2251         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2252         memcpy (ptr->key, key, keylen);
2253         ptr->len = keylen;
2254         
2255         slotptr(set->page, slot)->off = set->page->min;
2256         bt_unlockpage(bt, BtLockWrite, set->latch);
2257         bt_unpinlatch (set->latch);
2258         return 0;
2259   }
2260   return 0;
2261 }
2262
2263 typedef struct {
2264         uint entry;                     // latch table entry number
2265         uint slot:31;           // page slot number
2266         uint reuse:1;           // reused previous page
2267 } AtomicTxn;
2268
2269 typedef struct {
2270         uid page_no;            // page number for split leaf
2271         void *next;                     // next key to insert
2272         uint entry:29;          // latch table entry number
2273         uint type:2;            // 0 == insert, 1 == delete, 2 == free
2274         uint nounlock:1;        // don't unlock ParentModification
2275         unsigned char leafkey[BT_keyarray];
2276 } AtomicKey;
2277
2278 //  find and load leaf page for given key
2279 //      leave page Atomic locked and Read locked.
2280
2281 int bt_atomicload (BtDb *bt, BtPageSet *set, unsigned char *key, uint len)
2282 {
2283 BtLatchSet *prevlatch;
2284 uid page_no;
2285 uint slot;
2286
2287   //  find level one slot
2288
2289   if( !(slot = bt_loadpage (bt, set, key, len, 1, BtLockRead)) )
2290         return 0;
2291
2292   // find next non-dead entry on this page
2293   //    it will be the fence key if nothing else
2294
2295   while( slotptr(set->page, slot)->dead )
2296         if( slot++ < set->page->cnt )
2297           continue;
2298         else
2299           return bt->err = BTERR_struct, 0;
2300
2301   page_no = bt_getid(valptr(set->page, slot)->value);
2302   prevlatch = set->latch;
2303
2304   while( page_no ) {
2305         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2306           set->page = bt_mappage (bt, set->latch);
2307         else
2308           return 0;
2309
2310         // obtain read lock using lock chaining with Access mode
2311         //      release & unpin parent/left sibling page
2312
2313         bt_lockpage(bt, BtLockAccess, set->latch);
2314
2315         bt_unlockpage(bt, BtLockRead, prevlatch);
2316         bt_unpinlatch (prevlatch);
2317
2318         bt_lockpage(bt, BtLockRead, set->latch);
2319
2320         //  find key on page at this level
2321         //  and descend to requested level
2322
2323         if( !set->page->kill )
2324          if( !bt_getid (set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key, len) >= 0 ) {
2325           bt_unlockpage(bt, BtLockRead, set->latch);
2326           bt_lockpage(bt, BtLockAtomic, set->latch);
2327           bt_lockpage(bt, BtLockRead, set->latch);
2328
2329           if( !set->page->kill )
2330            if( slot = bt_findslot (set->page, key, len) ) {
2331                 bt_unlockpage(bt, BtLockAccess, set->latch);
2332                 return slot;
2333            }
2334
2335           bt_unlockpage(bt, BtLockAtomic, set->latch);
2336           }
2337
2338         //  slide right into next page
2339
2340         bt_unlockpage(bt, BtLockAccess, set->latch);
2341         page_no = bt_getid(set->page->right);
2342         prevlatch = set->latch;
2343   }
2344
2345   // return error on end of right chain
2346
2347   bt->err = BTERR_struct;
2348   return 0;     // return error
2349 }
2350
2351 //      determine actual page where key is located
2352 //  return slot number
2353
2354 uint bt_atomicpage (BtDb *bt, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2355 {
2356 BtKey *key = keyptr(source,src);
2357 uint slot = locks[src].slot;
2358 uint entry;
2359
2360         if( src > 1 && locks[src].reuse )
2361           entry = locks[src-1].entry, slot = 0;
2362         else
2363           entry = locks[src].entry;
2364
2365         if( slot ) {
2366                 set->latch = bt->mgr->latchsets + entry;
2367                 set->page = bt_mappage (bt, set->latch);
2368                 return slot;
2369         }
2370
2371         //      is locks->reuse set? or was slot zeroed?
2372         //      if so, find where our key is located 
2373         //      on current page or pages split on
2374         //      same page txn operations.
2375
2376         do {
2377                 set->latch = bt->mgr->latchsets + entry;
2378                 set->page = bt_mappage (bt, set->latch);
2379
2380                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2381                   if( slotptr(set->page, slot)->type == Librarian )
2382                         slot++;
2383                   if( locks[src].reuse )
2384                         locks[src].entry = entry;
2385                   return slot;
2386                 }
2387         } while( entry = set->latch->split );
2388
2389         bt->err = BTERR_atomic;
2390         return 0;
2391 }
2392
2393 BTERR bt_atomicinsert (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2394 {
2395 BtKey *key = keyptr(source, src);
2396 BtVal *val = valptr(source, src);
2397 BtLatchSet *latch;
2398 BtPageSet set[1];
2399 uint entry, slot;
2400
2401   while( slot = bt_atomicpage (bt, source, locks, src, set) ) {
2402         if( slot = bt_cleanpage(bt, set, key->len, slot, val->len) )
2403           return bt_insertslot (bt, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type, 0);
2404
2405         if( entry = bt_splitpage (bt, set) )
2406           latch = bt->mgr->latchsets + entry;
2407         else
2408           return bt->err;
2409
2410         //      splice right page into split chain
2411         //      and WriteLock it.
2412
2413         bt_lockpage(bt, BtLockWrite, latch);
2414         latch->split = set->latch->split;
2415         set->latch->split = entry;
2416         locks[src].slot = 0;
2417   }
2418
2419   return bt->err = BTERR_atomic;
2420 }
2421
2422 BTERR bt_atomicdelete (BtDb *bt, BtPage source, AtomicTxn *locks, uint src)
2423 {
2424 BtKey *key = keyptr(source, src);
2425 uint idx, entry, slot;
2426 BtPageSet set[1];
2427 BtKey *ptr;
2428 BtVal *val;
2429
2430         if( slot = bt_atomicpage (bt, source, locks, src, set) )
2431           ptr = keyptr(set->page, slot);
2432         else
2433           return bt->err = BTERR_struct;
2434
2435         if( !keycmp (ptr, key->key, key->len) )
2436           if( !slotptr(set->page, slot)->dead )
2437                 slotptr(set->page, slot)->dead = 1;
2438           else
2439                 return 0;
2440         else
2441                 return 0;
2442
2443         val = valptr(set->page, slot);
2444         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2445         set->latch->dirty = 1;
2446         set->page->act--;
2447         bt->found++;
2448         return 0;
2449 }
2450
2451 //      delete an empty master page for a transaction
2452
2453 //      note that the far right page never empties because
2454 //      it always contains (at least) the infinite stopper key
2455 //      and that all pages that don't contain any keys are
2456 //      deleted, or are being held under Atomic lock.
2457
2458 BTERR bt_atomicfree (BtDb *bt, BtPageSet *prev)
2459 {
2460 BtPageSet right[1], temp[1];
2461 unsigned char value[BtId];
2462 uid right_page_no;
2463 BtKey *ptr;
2464
2465         bt_lockpage(bt, BtLockWrite, prev->latch);
2466
2467         //      grab the right sibling
2468
2469         if( right->latch = bt_pinlatch(bt, bt_getid (prev->page->right), 1) )
2470                 right->page = bt_mappage (bt, right->latch);
2471         else
2472                 return bt->err;
2473
2474         bt_lockpage(bt, BtLockAtomic, right->latch);
2475         bt_lockpage(bt, BtLockWrite, right->latch);
2476
2477         //      and pull contents over empty page
2478         //      while preserving master's left link
2479
2480         memcpy (right->page->left, prev->page->left, BtId);
2481         memcpy (prev->page, right->page, bt->mgr->page_size);
2482
2483         //      forward seekers to old right sibling
2484         //      to new page location in set
2485
2486         bt_putid (right->page->right, prev->latch->page_no);
2487         right->latch->dirty = 1;
2488         right->page->kill = 1;
2489
2490         //      remove pointer to right page for searchers by
2491         //      changing right fence key to point to the master page
2492
2493         ptr = keyptr(right->page,right->page->cnt);
2494         bt_putid (value, prev->latch->page_no);
2495
2496         if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2497                 return bt->err;
2498
2499         //  now that master page is in good shape we can
2500         //      remove its locks.
2501
2502         bt_unlockpage (bt, BtLockAtomic, prev->latch);
2503         bt_unlockpage (bt, BtLockWrite, prev->latch);
2504
2505         //  fix master's right sibling's left pointer
2506         //      to remove scanner's poiner to the right page
2507
2508         if( right_page_no = bt_getid (prev->page->right) ) {
2509           if( temp->latch = bt_pinlatch (bt, right_page_no, 1) )
2510                 temp->page = bt_mappage (bt, temp->latch);
2511
2512           bt_lockpage (bt, BtLockWrite, temp->latch);
2513           bt_putid (temp->page->left, prev->latch->page_no);
2514           temp->latch->dirty = 1;
2515
2516           bt_unlockpage (bt, BtLockWrite, temp->latch);
2517           bt_unpinlatch (temp->latch);
2518         } else {        // master is now the far right page
2519           bt_spinwritelock (bt->mgr->lock);
2520           bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2521           bt_spinreleasewrite(bt->mgr->lock);
2522         }
2523
2524         //      now that there are no pointers to the right page
2525         //      we can delete it after the last read access occurs
2526
2527         bt_unlockpage (bt, BtLockWrite, right->latch);
2528         bt_unlockpage (bt, BtLockAtomic, right->latch);
2529         bt_lockpage (bt, BtLockDelete, right->latch);
2530         bt_lockpage (bt, BtLockWrite, right->latch);
2531         bt_freepage (bt, right);
2532         return 0;
2533 }
2534
2535 //      atomic modification of a batch of keys.
2536
2537 //      return -1 if BTERR is set
2538 //      otherwise return slot number
2539 //      causing the key constraint violation
2540 //      or zero on successful completion.
2541
2542 int bt_atomictxn (BtDb *bt, BtPage source)
2543 {
2544 uint src, idx, slot, samepage, entry;
2545 AtomicKey *head, *tail, *leaf;
2546 BtPageSet set[1], prev[1];
2547 unsigned char value[BtId];
2548 BtKey *key, *ptr, *key2;
2549 BtLatchSet *latch;
2550 AtomicTxn *locks;
2551 int result = 0;
2552 BtSlot temp[1];
2553 BtPage page;
2554 BtVal *val;
2555 uid right;
2556 int type;
2557
2558   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2559   head = NULL;
2560   tail = NULL;
2561
2562   // stable sort the list of keys into order to
2563   //    prevent deadlocks between threads.
2564
2565   for( src = 1; src++ < source->cnt; ) {
2566         *temp = *slotptr(source,src);
2567         key = keyptr (source,src);
2568
2569         for( idx = src; --idx; ) {
2570           key2 = keyptr (source,idx);
2571           if( keycmp (key, key2->key, key2->len) < 0 ) {
2572                 *slotptr(source,idx+1) = *slotptr(source,idx);
2573                 *slotptr(source,idx) = *temp;
2574           } else
2575                 break;
2576         }
2577   }
2578
2579   // Load the leaf page for each key
2580   // group same page references with reuse bit
2581   // and determine any constraint violations
2582
2583   for( src = 0; src++ < source->cnt; ) {
2584         key = keyptr(source, src);
2585         slot = 0;
2586
2587         // first determine if this modification falls
2588         // on the same page as the previous modification
2589         //      note that the far right leaf page is a special case
2590
2591         if( samepage = src > 1 )
2592           if( samepage = !bt_getid(set->page->right) || keycmp (keyptr(set->page, set->page->cnt), key->key, key->len) >= 0 )
2593                 slot = bt_findslot(set->page, key->key, key->len);
2594           else // release read on previous page
2595                 bt_unlockpage(bt, BtLockRead, set->latch); 
2596
2597         if( !slot )
2598           if( slot = bt_atomicload(bt, set, key->key, key->len) )
2599                 set->latch->split = 0;
2600           else
2601                 return -1;
2602
2603         if( slotptr(set->page, slot)->type == Librarian )
2604           ptr = keyptr(set->page, ++slot);
2605         else
2606           ptr = keyptr(set->page, slot);
2607
2608         if( !samepage ) {
2609           locks[src].entry = set->latch->entry;
2610           locks[src].slot = slot;
2611           locks[src].reuse = 0;
2612         } else {
2613           locks[src].entry = 0;
2614           locks[src].slot = 0;
2615           locks[src].reuse = 1;
2616         }
2617
2618         switch( slotptr(source, src)->type ) {
2619         case Duplicate:
2620         case Unique:
2621           if( !slotptr(set->page, slot)->dead )
2622            if( slot < set->page->cnt || bt_getid (set->page->right) )
2623             if( !keycmp (ptr, key->key, key->len) ) {
2624
2625                   // return constraint violation if key already exists
2626
2627                   bt_unlockpage(bt, BtLockRead, set->latch);
2628                   result = src;
2629
2630                   while( src ) {
2631                         if( locks[src].entry ) {
2632                           set->latch = bt->mgr->latchsets + locks[src].entry;
2633                           bt_unlockpage(bt, BtLockAtomic, set->latch);
2634                           bt_unpinlatch (set->latch);
2635                         }
2636                         src--;
2637                   }
2638                   free (locks);
2639                   return result;
2640                 }
2641           break;
2642         }
2643   }
2644
2645   //  unlock last loadpage lock
2646
2647   if( source->cnt > 1 )
2648         bt_unlockpage(bt, BtLockRead, set->latch);
2649
2650   //  obtain write lock for each master page
2651
2652   for( src = 0; src++ < source->cnt; )
2653         if( locks[src].reuse )
2654           continue;
2655         else
2656           bt_lockpage(bt, BtLockWrite, bt->mgr->latchsets + locks[src].entry);
2657
2658   // insert or delete each key
2659   // process any splits or merges
2660   // release Write & Atomic latches
2661   // set ParentModifications and build
2662   // queue of keys to insert for split pages
2663   // or delete for deleted pages.
2664
2665   // run through txn list backwards
2666
2667   samepage = source->cnt + 1;
2668
2669   for( src = source->cnt; src; src-- ) {
2670         if( locks[src].reuse )
2671           continue;
2672
2673         //  perform the txn operations
2674         //      from smaller to larger on
2675         //  the same page
2676
2677         for( idx = src; idx < samepage; idx++ )
2678          switch( slotptr(source,idx)->type ) {
2679          case Delete:
2680           if( bt_atomicdelete (bt, source, locks, idx) )
2681                 return -1;
2682           break;
2683
2684         case Duplicate:
2685         case Unique:
2686           if( bt_atomicinsert (bt, source, locks, idx) )
2687                 return -1;
2688           break;
2689         }
2690
2691         //      after the same page operations have finished,
2692         //  process master page for splits or deletion.
2693
2694         latch = prev->latch = bt->mgr->latchsets + locks[src].entry;
2695         prev->page = bt_mappage (bt, prev->latch);
2696         samepage = src;
2697
2698         //  pick-up all splits from master page
2699         //      each one is already WriteLocked.
2700
2701         entry = prev->latch->split;
2702
2703         while( entry ) {
2704           set->latch = bt->mgr->latchsets + entry;
2705           set->page = bt_mappage (bt, set->latch);
2706           entry = set->latch->split;
2707
2708           // delete empty master page by undoing its split
2709           //  (this is potentially another empty page)
2710           //  note that there are no new left pointers yet
2711
2712           if( !prev->page->act ) {
2713                 memcpy (set->page->left, prev->page->left, BtId);
2714                 memcpy (prev->page, set->page, bt->mgr->page_size);
2715                 bt_lockpage (bt, BtLockDelete, set->latch);
2716                 bt_freepage (bt, set);
2717
2718                 prev->latch->dirty = 1;
2719                 continue;
2720           }
2721
2722           // remove empty page from the split chain
2723
2724           if( !set->page->act ) {
2725                 memcpy (prev->page->right, set->page->right, BtId);
2726                 prev->latch->split = set->latch->split;
2727                 bt_lockpage (bt, BtLockDelete, set->latch);
2728                 bt_freepage (bt, set);
2729                 continue;
2730           }
2731
2732           //  schedule prev fence key update
2733
2734           ptr = keyptr(prev->page,prev->page->cnt);
2735           leaf = calloc (sizeof(AtomicKey), 1);
2736
2737           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2738           leaf->page_no = prev->latch->page_no;
2739           leaf->entry = prev->latch->entry;
2740           leaf->type = 0;
2741
2742           if( tail )
2743                 tail->next = leaf;
2744           else
2745                 head = leaf;
2746
2747           tail = leaf;
2748
2749           // splice in the left link into the split page
2750
2751           bt_putid (set->page->left, prev->latch->page_no);
2752           bt_lockpage(bt, BtLockParent, prev->latch);
2753           bt_unlockpage(bt, BtLockWrite, prev->latch);
2754           *prev = *set;
2755         }
2756
2757         //  update left pointer in next right page from last split page
2758         //      (if all splits were reversed, latch->split == 0)
2759
2760         if( latch->split ) {
2761           //  fix left pointer in master's original (now split)
2762           //  far right sibling or set rightmost page in page zero
2763
2764           if( right = bt_getid (prev->page->right) ) {
2765                 if( set->latch = bt_pinlatch (bt, right, 1) )
2766                   set->page = bt_mappage (bt, set->latch);
2767                 else
2768                   return -1;
2769
2770             bt_lockpage (bt, BtLockWrite, set->latch);
2771             bt_putid (set->page->left, prev->latch->page_no);
2772                 set->latch->dirty = 1;
2773             bt_unlockpage (bt, BtLockWrite, set->latch);
2774                 bt_unpinlatch (set->latch);
2775           } else {      // prev is rightmost page
2776             bt_spinwritelock (bt->mgr->lock);
2777                 bt_putid (bt->mgr->pagezero->alloc->left, prev->latch->page_no);
2778             bt_spinreleasewrite(bt->mgr->lock);
2779           }
2780
2781           //  Process last page split in chain
2782
2783           ptr = keyptr(prev->page,prev->page->cnt);
2784           leaf = calloc (sizeof(AtomicKey), 1);
2785
2786           memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2787           leaf->page_no = prev->latch->page_no;
2788           leaf->entry = prev->latch->entry;
2789           leaf->type = 0;
2790   
2791           if( tail )
2792                 tail->next = leaf;
2793           else
2794                 head = leaf;
2795
2796           tail = leaf;
2797
2798           bt_lockpage(bt, BtLockParent, prev->latch);
2799           bt_unlockpage(bt, BtLockWrite, prev->latch);
2800
2801           //  remove atomic lock on master page
2802
2803           bt_unlockpage(bt, BtLockAtomic, latch);
2804           continue;
2805         }
2806
2807         //  finished if prev page occupied (either master or final split)
2808
2809         if( prev->page->act ) {
2810           bt_unlockpage(bt, BtLockWrite, latch);
2811           bt_unlockpage(bt, BtLockAtomic, latch);
2812           bt_unpinlatch(latch);
2813           continue;
2814         }
2815
2816         // any and all splits were reversed, and the
2817         // master page located in prev is empty, delete it
2818         // by pulling over master's right sibling.
2819
2820         // Remove empty master's fence key
2821
2822         ptr = keyptr(prev->page,prev->page->cnt);
2823
2824         if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2825                 return -1;
2826
2827         //      perform the remainder of the delete
2828         //      from the FIFO queue
2829
2830         leaf = calloc (sizeof(AtomicKey), 1);
2831
2832         memcpy (leaf->leafkey, ptr, ptr->len + sizeof(BtKey));
2833         leaf->page_no = prev->latch->page_no;
2834         leaf->entry = prev->latch->entry;
2835         leaf->nounlock = 1;
2836         leaf->type = 2;
2837   
2838         if( tail )
2839           tail->next = leaf;
2840         else
2841           head = leaf;
2842
2843         tail = leaf;
2844
2845         //      leave atomic lock in place until
2846         //      deletion completes in next phase.
2847
2848         bt_unlockpage(bt, BtLockWrite, prev->latch);
2849   }
2850   
2851   //  add & delete keys for any pages split or merged during transaction
2852
2853   if( leaf = head )
2854     do {
2855           set->latch = bt->mgr->latchsets + leaf->entry;
2856           set->page = bt_mappage (bt, set->latch);
2857
2858           bt_putid (value, leaf->page_no);
2859           ptr = (BtKey *)leaf->leafkey;
2860
2861           switch( leaf->type ) {
2862           case 0:       // insert key
2863             if( bt_insertkey (bt, ptr->key, ptr->len, 1, value, BtId, 1) )
2864                   return -1;
2865
2866                 break;
2867
2868           case 1:       // delete key
2869                 if( bt_deletekey (bt, ptr->key, ptr->len, 1) )
2870                   return -1;
2871
2872                 break;
2873
2874           case 2:       // free page
2875                 if( bt_atomicfree (bt, set) )
2876                   return -1;
2877
2878                 break;
2879           }
2880
2881           if( !leaf->nounlock )
2882             bt_unlockpage (bt, BtLockParent, set->latch);
2883
2884           bt_unpinlatch (set->latch);
2885           tail = leaf->next;
2886           free (leaf);
2887         } while( leaf = tail );
2888
2889   // return success
2890
2891   free (locks);
2892   return 0;
2893 }
2894
2895 //      set cursor to highest slot on highest page
2896
2897 uint bt_lastkey (BtDb *bt)
2898 {
2899 uid page_no = bt_getid (bt->mgr->pagezero->alloc->left);
2900 BtPageSet set[1];
2901
2902         if( set->latch = bt_pinlatch (bt, page_no, 1) )
2903                 set->page = bt_mappage (bt, set->latch);
2904         else
2905                 return 0;
2906
2907     bt_lockpage(bt, BtLockRead, set->latch);
2908         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2909     bt_unlockpage(bt, BtLockRead, set->latch);
2910         bt_unpinlatch (set->latch);
2911
2912         bt->cursor_page = page_no;
2913         return bt->cursor->cnt;
2914 }
2915
2916 //      return previous slot on cursor page
2917
2918 uint bt_prevkey (BtDb *bt, uint slot)
2919 {
2920 uid ourright, next, us = bt->cursor_page;
2921 BtPageSet set[1];
2922
2923         if( --slot )
2924                 return slot;
2925
2926         ourright = bt_getid(bt->cursor->right);
2927
2928 goleft:
2929         if( !(next = bt_getid(bt->cursor->left)) )
2930                 return 0;
2931
2932 findourself:
2933         bt->cursor_page = next;
2934
2935         if( set->latch = bt_pinlatch (bt, next, 1) )
2936                 set->page = bt_mappage (bt, set->latch);
2937         else
2938                 return 0;
2939
2940     bt_lockpage(bt, BtLockRead, set->latch);
2941         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2942         bt_unlockpage(bt, BtLockRead, set->latch);
2943         bt_unpinlatch (set->latch);
2944         
2945         next = bt_getid (bt->cursor->right);
2946
2947         if( bt->cursor->kill )
2948                 goto findourself;
2949
2950         if( next != us )
2951           if( next == ourright )
2952                 goto goleft;
2953           else
2954                 goto findourself;
2955
2956         return bt->cursor->cnt;
2957 }
2958
2959 //  return next slot on cursor page
2960 //  or slide cursor right into next page
2961
2962 uint bt_nextkey (BtDb *bt, uint slot)
2963 {
2964 BtPageSet set[1];
2965 uid right;
2966
2967   do {
2968         right = bt_getid(bt->cursor->right);
2969
2970         while( slot++ < bt->cursor->cnt )
2971           if( slotptr(bt->cursor,slot)->dead )
2972                 continue;
2973           else if( right || (slot < bt->cursor->cnt) ) // skip infinite stopper
2974                 return slot;
2975           else
2976                 break;
2977
2978         if( !right )
2979                 break;
2980
2981         bt->cursor_page = right;
2982
2983         if( set->latch = bt_pinlatch (bt, right, 1) )
2984                 set->page = bt_mappage (bt, set->latch);
2985         else
2986                 return 0;
2987
2988     bt_lockpage(bt, BtLockRead, set->latch);
2989
2990         memcpy (bt->cursor, set->page, bt->mgr->page_size);
2991
2992         bt_unlockpage(bt, BtLockRead, set->latch);
2993         bt_unpinlatch (set->latch);
2994         slot = 0;
2995
2996   } while( 1 );
2997
2998   return bt->err = 0;
2999 }
3000
3001 //  cache page of keys into cursor and return starting slot for given key
3002
3003 uint bt_startkey (BtDb *bt, unsigned char *key, uint len)
3004 {
3005 BtPageSet set[1];
3006 uint slot;
3007
3008         // cache page for retrieval
3009
3010         if( slot = bt_loadpage (bt, set, key, len, 0, BtLockRead) )
3011           memcpy (bt->cursor, set->page, bt->mgr->page_size);
3012         else
3013           return 0;
3014
3015         bt->cursor_page = set->latch->page_no;
3016
3017         bt_unlockpage(bt, BtLockRead, set->latch);
3018         bt_unpinlatch (set->latch);
3019         return slot;
3020 }
3021
3022 BtKey *bt_key(BtDb *bt, uint slot)
3023 {
3024         return keyptr(bt->cursor, slot);
3025 }
3026
3027 BtVal *bt_val(BtDb *bt, uint slot)
3028 {
3029         return valptr(bt->cursor,slot);
3030 }
3031
3032 #ifdef STANDALONE
3033
3034 #ifndef unix
3035 double getCpuTime(int type)
3036 {
3037 FILETIME crtime[1];
3038 FILETIME xittime[1];
3039 FILETIME systime[1];
3040 FILETIME usrtime[1];
3041 SYSTEMTIME timeconv[1];
3042 double ans = 0;
3043
3044         memset (timeconv, 0, sizeof(SYSTEMTIME));
3045
3046         switch( type ) {
3047         case 0:
3048                 GetSystemTimeAsFileTime (xittime);
3049                 FileTimeToSystemTime (xittime, timeconv);
3050                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3051                 break;
3052         case 1:
3053                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3054                 FileTimeToSystemTime (usrtime, timeconv);
3055                 break;
3056         case 2:
3057                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3058                 FileTimeToSystemTime (systime, timeconv);
3059                 break;
3060         }
3061
3062         ans += (double)timeconv->wHour * 3600;
3063         ans += (double)timeconv->wMinute * 60;
3064         ans += (double)timeconv->wSecond;
3065         ans += (double)timeconv->wMilliseconds / 1000;
3066         return ans;
3067 }
3068 #else
3069 #include <time.h>
3070 #include <sys/resource.h>
3071
3072 double getCpuTime(int type)
3073 {
3074 struct rusage used[1];
3075 struct timeval tv[1];
3076
3077         switch( type ) {
3078         case 0:
3079                 gettimeofday(tv, NULL);
3080                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3081
3082         case 1:
3083                 getrusage(RUSAGE_SELF, used);
3084                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3085
3086         case 2:
3087                 getrusage(RUSAGE_SELF, used);
3088                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3089         }
3090
3091         return 0;
3092 }
3093 #endif
3094
3095 void bt_poolaudit (BtMgr *mgr)
3096 {
3097 BtLatchSet *latch;
3098 uint slot = 0;
3099
3100         while( slot++ < mgr->latchdeployed ) {
3101                 latch = mgr->latchsets + slot;
3102
3103                 if( *latch->readwr->rin & MASK )
3104                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", slot, latch->page_no);
3105                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3106
3107                 if( *latch->access->rin & MASK )
3108                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", slot, latch->page_no);
3109                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3110
3111                 if( *latch->parent->ticket != *latch->parent->serving )
3112                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", slot, latch->page_no);
3113                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3114
3115                 if( latch->pin & ~CLOCK_bit ) {
3116                         fprintf(stderr, "latchset %d pinned for page %.8x\n", slot, latch->page_no);
3117                         latch->pin = 0;
3118                 }
3119         }
3120 }
3121
3122 uint bt_latchaudit (BtDb *bt)
3123 {
3124 ushort idx, hashidx;
3125 uid next, page_no;
3126 BtLatchSet *latch;
3127 uint cnt = 0;
3128 BtKey *ptr;
3129
3130         if( *(ushort *)(bt->mgr->lock) )
3131                 fprintf(stderr, "Alloc page locked\n");
3132         *(ushort *)(bt->mgr->lock) = 0;
3133
3134         for( idx = 1; idx <= bt->mgr->latchdeployed; idx++ ) {
3135                 latch = bt->mgr->latchsets + idx;
3136                 if( *latch->readwr->rin & MASK )
3137                         fprintf(stderr, "latchset %d rwlocked for page %.8x\n", idx, latch->page_no);
3138                 memset ((ushort *)latch->readwr, 0, sizeof(RWLock));
3139
3140                 if( *latch->access->rin & MASK )
3141                         fprintf(stderr, "latchset %d accesslocked for page %.8x\n", idx, latch->page_no);
3142                 memset ((ushort *)latch->access, 0, sizeof(RWLock));
3143
3144                 if( *latch->parent->ticket != *latch->parent->serving )
3145                         fprintf(stderr, "latchset %d parentlocked for page %.8x\n", idx, latch->page_no);
3146                 memset ((ushort *)latch->parent, 0, sizeof(RWLock));
3147
3148                 if( latch->pin ) {
3149                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3150                         latch->pin = 0;
3151                 }
3152         }
3153
3154         for( hashidx = 0; hashidx < bt->mgr->latchhash; hashidx++ ) {
3155           if( *(ushort *)(bt->mgr->hashtable[hashidx].latch) )
3156                         fprintf(stderr, "hash entry %d locked\n", hashidx);
3157
3158           *(ushort *)(bt->mgr->hashtable[hashidx].latch) = 0;
3159
3160           if( idx = bt->mgr->hashtable[hashidx].slot ) do {
3161                 latch = bt->mgr->latchsets + idx;
3162                 if( latch->pin )
3163                         fprintf(stderr, "latchset %d pinned for page %.8x\n", idx, latch->page_no);
3164           } while( idx = latch->next );
3165         }
3166
3167         page_no = LEAF_page;
3168
3169         while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3170         uid off = page_no << bt->mgr->page_bits;
3171 #ifdef unix
3172           pread (bt->mgr->idx, bt->frame, bt->mgr->page_size, off);
3173 #else
3174         DWORD amt[1];
3175
3176           SetFilePointer (bt->mgr->idx, (long)off, (long*)(&off)+1, FILE_BEGIN);
3177
3178           if( !ReadFile(bt->mgr->idx, bt->frame, bt->mgr->page_size, amt, NULL))
3179                 return bt->err = BTERR_map;
3180
3181           if( *amt <  bt->mgr->page_size )
3182                 return bt->err = BTERR_map;
3183 #endif
3184                 if( !bt->frame->free && !bt->frame->lvl )
3185                         cnt += bt->frame->act;
3186                 page_no++;
3187         }
3188                 
3189         cnt--;  // remove stopper key
3190         fprintf(stderr, " Total keys read %d\n", cnt);
3191
3192         bt_close (bt);
3193         return 0;
3194 }
3195
3196 typedef struct {
3197         char idx;
3198         char *type;
3199         char *infile;
3200         BtMgr *mgr;
3201         int num;
3202 } ThreadArg;
3203
3204 //  standalone program to index file of keys
3205 //  then list them onto std-out
3206
3207 #ifdef unix
3208 void *index_file (void *arg)
3209 #else
3210 uint __stdcall index_file (void *arg)
3211 #endif
3212 {
3213 int line = 0, found = 0, cnt = 0, idx;
3214 uid next, page_no = LEAF_page;  // start on first page of leaves
3215 int ch, len = 0, slot, type = 0;
3216 unsigned char key[BT_maxkey];
3217 unsigned char txn[65536];
3218 ThreadArg *args = arg;
3219 BtPageSet set[1];
3220 uint nxt = 65536;
3221 BtPage page;
3222 BtKey *ptr;
3223 BtVal *val;
3224 BtDb *bt;
3225 FILE *in;
3226
3227         bt = bt_open (args->mgr);
3228         page = (BtPage)txn;
3229
3230         if( args->idx < strlen (args->type) )
3231                 ch = args->type[args->idx];
3232         else
3233                 ch = args->type[strlen(args->type) - 1];
3234
3235         switch(ch | 0x20)
3236         {
3237         case 'a':
3238                 fprintf(stderr, "started latch mgr audit\n");
3239                 cnt = bt_latchaudit (bt);
3240                 fprintf(stderr, "finished latch mgr audit, found %d keys\n", cnt);
3241                 break;
3242
3243         case 'd':
3244                 type = Delete;
3245
3246         case 'p':
3247                 if( !type )
3248                         type = Unique;
3249
3250                 if( args->num )
3251                  if( type == Delete )
3252                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3253                  else
3254                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3255                 else
3256                  if( type == Delete )
3257                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3258                  else
3259                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3260
3261                 if( in = fopen (args->infile, "rb") )
3262                   while( ch = getc(in), ch != EOF )
3263                         if( ch == '\n' )
3264                         {
3265                           line++;
3266
3267                           if( !args->num ) {
3268                             if( bt_insertkey (bt, key, 10, 0, key + 10, len - 10, 1) )
3269                                   fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3270                             len = 0;
3271                                 continue;
3272                           }
3273
3274                           nxt -= len - 10;
3275                           memcpy (txn + nxt, key + 10, len - 10);
3276                           nxt -= 1;
3277                           txn[nxt] = len - 10;
3278                           nxt -= 10;
3279                           memcpy (txn + nxt, key, 10);
3280                           nxt -= 1;
3281                           txn[nxt] = 10;
3282                           slotptr(page,++cnt)->off  = nxt;
3283                           slotptr(page,cnt)->type = type;
3284                           len = 0;
3285
3286                           if( cnt < args->num )
3287                                 continue;
3288
3289                           page->cnt = cnt;
3290                           page->act = cnt;
3291                           page->min = nxt;
3292
3293                           if( bt_atomictxn (bt, page) )
3294                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3295                           nxt = sizeof(txn);
3296                           cnt = 0;
3297
3298                         }
3299                         else if( len < BT_maxkey )
3300                                 key[len++] = ch;
3301                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes %d found\n", args->infile, line, bt->reads, bt->writes, bt->found);
3302                 break;
3303
3304         case 'w':
3305                 fprintf(stderr, "started indexing for %s\n", args->infile);
3306                 if( in = fopen (args->infile, "r") )
3307                   while( ch = getc(in), ch != EOF )
3308                         if( ch == '\n' )
3309                         {
3310                           line++;
3311
3312                           if( bt_insertkey (bt, key, len, 0, NULL, 0, 1) )
3313                                 fprintf(stderr, "Error %d Line: %d\n", bt->err, line), exit(0);
3314                           len = 0;
3315                         }
3316                         else if( len < BT_maxkey )
3317                                 key[len++] = ch;
3318                 fprintf(stderr, "finished %s for %d keys: %d reads %d writes\n", args->infile, line, bt->reads, bt->writes);
3319                 break;
3320
3321         case 'f':
3322                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3323                 if( in = fopen (args->infile, "rb") )
3324                   while( ch = getc(in), ch != EOF )
3325                         if( ch == '\n' )
3326                         {
3327                           line++;
3328                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3329                                 found++;
3330                           else if( bt->err )
3331                                 fprintf(stderr, "Error %d Syserr %d Line: %d\n", bt->err, errno, line), exit(0);
3332                           len = 0;
3333                         }
3334                         else if( len < BT_maxkey )
3335                                 key[len++] = ch;
3336                 fprintf(stderr, "finished %s for %d keys, found %d: %d reads %d writes\n", args->infile, line, found, bt->reads, bt->writes);
3337                 break;
3338
3339         case 's':
3340                 fprintf(stderr, "started scanning\n");
3341                 do {
3342                         if( set->latch = bt_pinlatch (bt, page_no, 1) )
3343                                 set->page = bt_mappage (bt, set->latch);
3344                         else
3345                                 fprintf(stderr, "unable to obtain latch"), exit(1);
3346                         bt_lockpage (bt, BtLockRead, set->latch);
3347                         next = bt_getid (set->page->right);
3348
3349                         for( slot = 0; slot++ < set->page->cnt; )
3350                          if( next || slot < set->page->cnt )
3351                           if( !slotptr(set->page, slot)->dead ) {
3352                                 ptr = keyptr(set->page, slot);
3353                                 len = ptr->len;
3354
3355                             if( slotptr(set->page, slot)->type == Duplicate )
3356                                         len -= BtId;
3357
3358                                 fwrite (ptr->key, len, 1, stdout);
3359                                 val = valptr(set->page, slot);
3360                                 fwrite (val->value, val->len, 1, stdout);
3361                                 fputc ('\n', stdout);
3362                                 cnt++;
3363                            }
3364
3365                         bt_unlockpage (bt, BtLockRead, set->latch);
3366                         bt_unpinlatch (set->latch);
3367                 } while( page_no = next );
3368
3369                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3370                 break;
3371
3372         case 'r':
3373                 fprintf(stderr, "started reverse scan\n");
3374                 if( slot = bt_lastkey (bt) )
3375                    while( slot = bt_prevkey (bt, slot) ) {
3376                         if( slotptr(bt->cursor, slot)->dead )
3377                           continue;
3378
3379                         ptr = keyptr(bt->cursor, slot);
3380                         len = ptr->len;
3381
3382                         if( slotptr(bt->cursor, slot)->type == Duplicate )
3383                                 len -= BtId;
3384
3385                         fwrite (ptr->key, len, 1, stdout);
3386                         val = valptr(bt->cursor, slot);
3387                         fwrite (val->value, val->len, 1, stdout);
3388                         fputc ('\n', stdout);
3389                         cnt++;
3390                   }
3391
3392                 fprintf(stderr, " Total keys read %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3393                 break;
3394
3395         case 'c':
3396 #ifdef unix
3397                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3398 #endif
3399                 fprintf(stderr, "started counting\n");
3400                 page_no = LEAF_page;
3401
3402                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3403                         if( bt_readpage (bt->mgr, bt->frame, page_no) )
3404                                 break;
3405
3406                         if( !bt->frame->free && !bt->frame->lvl )
3407                                 cnt += bt->frame->act;
3408
3409                         bt->reads++;
3410                         page_no++;
3411                 }
3412                 
3413                 cnt--;  // remove stopper key
3414                 fprintf(stderr, " Total keys counted %d: %d reads, %d writes\n", cnt, bt->reads, bt->writes);
3415                 break;
3416         }
3417
3418         bt_close (bt);
3419 #ifdef unix
3420         return NULL;
3421 #else
3422         return 0;
3423 #endif
3424 }
3425
3426 typedef struct timeval timer;
3427
3428 int main (int argc, char **argv)
3429 {
3430 int idx, cnt, len, slot, err;
3431 int segsize, bits = 16;
3432 double start, stop;
3433 #ifdef unix
3434 pthread_t *threads;
3435 #else
3436 HANDLE *threads;
3437 #endif
3438 ThreadArg *args;
3439 uint poolsize = 0;
3440 float elapsed;
3441 int num = 0;
3442 char key[1];
3443 BtMgr *mgr;
3444 BtKey *ptr;
3445 BtDb *bt;
3446
3447         if( argc < 3 ) {
3448                 fprintf (stderr, "Usage: %s idx_file cmds [page_bits buffer_pool_size txn_size src_file1 src_file2 ... ]\n", argv[0]);
3449                 fprintf (stderr, "  where idx_file is the name of the btree file\n");
3450                 fprintf (stderr, "  cmds is a string of (c)ount/(r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with one character command for each input src_file. Commands with no input file need a placeholder.\n");
3451                 fprintf (stderr, "  page_bits is the page size in bits\n");
3452                 fprintf (stderr, "  buffer_pool_size is the number of pages in buffer pool\n");
3453                 fprintf (stderr, "  txn_size = n to block transactions into n units, or zero for no transactions\n");
3454                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3455                 exit(0);
3456         }
3457
3458         start = getCpuTime(0);
3459
3460         if( argc > 3 )
3461                 bits = atoi(argv[3]);
3462
3463         if( argc > 4 )
3464                 poolsize = atoi(argv[4]);
3465
3466         if( !poolsize )
3467                 fprintf (stderr, "Warning: no mapped_pool\n");
3468
3469         if( argc > 5 )
3470                 num = atoi(argv[5]);
3471
3472         cnt = argc - 6;
3473 #ifdef unix
3474         threads = malloc (cnt * sizeof(pthread_t));
3475 #else
3476         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3477 #endif
3478         args = malloc (cnt * sizeof(ThreadArg));
3479
3480         mgr = bt_mgr ((argv[1]), bits, poolsize);
3481
3482         if( !mgr ) {
3483                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3484                 exit (1);
3485         }
3486
3487         //      fire off threads
3488
3489         for( idx = 0; idx < cnt; idx++ ) {
3490                 args[idx].infile = argv[idx + 6];
3491                 args[idx].type = argv[2];
3492                 args[idx].mgr = mgr;
3493                 args[idx].num = num;
3494                 args[idx].idx = idx;
3495 #ifdef unix
3496                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3497                         fprintf(stderr, "Error creating thread %d\n", err);
3498 #else
3499                 threads[idx] = (HANDLE)_beginthreadex(NULL, 65536, index_file, args + idx, 0, NULL);
3500 #endif
3501         }
3502
3503         //      wait for termination
3504
3505 #ifdef unix
3506         for( idx = 0; idx < cnt; idx++ )
3507                 pthread_join (threads[idx], NULL);
3508 #else
3509         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3510
3511         for( idx = 0; idx < cnt; idx++ )
3512                 CloseHandle(threads[idx]);
3513
3514 #endif
3515         bt_poolaudit(mgr);
3516         bt_mgrclose (mgr);
3517
3518         elapsed = getCpuTime(0) - start;
3519         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3520         elapsed = getCpuTime(1);
3521         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3522         elapsed = getCpuTime(2);
3523         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3524 }
3525
3526 #endif  //STANDALONE