]> pd.if.org Git - btree/blob - threadskv10g.c
Fix a concurrency bug in page pinning in the buffer pool
[btree] / threadskv10g.c
1 // btree version threadskv10g futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      LSM B-trees for write optimization
11 //      larger sized leaf pages than non-leaf
12 //      and LSM B-tree find & count operations
13
14 // 28 OCT 2014
15
16 // author: karl malbrain, malbrain@cal.berkeley.edu
17
18 /*
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
21
22 The orginal author is Karl Malbrain.
23
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
30 */
31
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
34
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
37
38 #ifdef linux
39 #define _GNU_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
42 #define SYS_futex 202
43 #endif
44
45 #ifdef unix
46 #include <unistd.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <fcntl.h>
50 #include <sys/time.h>
51 #include <sys/mman.h>
52 #include <errno.h>
53 #include <pthread.h>
54 #include <limits.h>
55 #else
56 #define WIN32_LEAN_AND_MEAN
57 #include <windows.h>
58 #include <stdio.h>
59 #include <stdlib.h>
60 #include <time.h>
61 #include <fcntl.h>
62 #include <process.h>
63 #include <intrin.h>
64 #endif
65
66 #include <memory.h>
67 #include <string.h>
68 #include <stddef.h>
69
70 typedef unsigned long long      uid;
71 typedef unsigned long long      logseqno;
72
73 #ifndef unix
74 typedef unsigned long long      off64_t;
75 typedef unsigned short          ushort;
76 typedef unsigned int            uint;
77 #endif
78
79 #define BT_ro 0x6f72    // ro
80 #define BT_rw 0x7772    // rw
81
82 #define BT_maxbits              26                                      // maximum page size in bits
83 #define BT_minbits              9                                       // minimum page size in bits
84 #define BT_minpage              (1 << BT_minbits)       // minimum page size
85 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
86
87 //  BTree page number constants
88 #define ALLOC_page              0       // allocation page
89 #define ROOT_page               1       // root of the btree
90 #define LEAF_page               2       // first page of leaves
91
92 //      Number of levels to create in a new BTree
93
94 #define MIN_lvl                 2
95
96 /*
97 There are six lock types for each node in four independent sets: 
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
104 */
105
106 typedef enum{
107         BtLockAccess = 1,
108         BtLockDelete = 2,
109         BtLockRead   = 4,
110         BtLockWrite  = 8,
111         BtLockParent = 16,
112         BtLockLink   = 32
113 } BtLock;
114
115 typedef struct {
116   union {
117         struct {
118           volatile unsigned char xcl[1];
119           volatile unsigned char filler;
120           volatile ushort waiters[1];
121         } bits[1];
122         uint value[1];
123   };
124 } MutexLatch;
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   MutexLatch xcl[1];
130   MutexLatch wrt[1];
131   ushort readers;
132   ushort dup;           // re-entrant locks
133   ushort tid;           // owner thread-no
134   ushort line;          // owner line #
135 } RWLock;
136
137 //  hash table entries
138
139 typedef struct {
140         MutexLatch latch[1];
141         uint entry;             // Latch table entry at head of chain
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock link[1];                 // left link update in progress
152         MutexLatch modify[1];   // modify entry lite latch
153         uint split;                             // right split page atomic insert
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of accessing threads
157         unsigned char dirty;    // page in cache is dirty
158         unsigned char leaf;             // page in cache is a leaf
159 } BtLatchSet;
160
161 //      Define the length of the page record numbers
162
163 #define BtId 6
164
165 //      Page key slot definition.
166
167 //      Keys are marked dead, but remain on the page until
168 //      it cleanup is called. The fence key (highest key) for
169 //      a leaf page is always present, even after cleanup.
170
171 //      Slot types
172
173 //      In addition to the Unique keys that occupy slots
174 //      there are Librarian and Duplicate key
175 //      slots occupying the key slot array.
176
177 //      The Librarian slots are dead keys that
178 //      serve as filler, available to add new Unique
179 //      or Dup slots that are inserted into the B-tree.
180
181 //      The Duplicate slots have had their key bytes extended
182 //      by 6 bytes to contain a binary duplicate key uniqueifier.
183
184 typedef enum {
185         Unique,
186         Update,
187         Librarian,
188         Duplicate,
189         Delete
190 } BtSlotType;
191
192 typedef struct {
193         uint off:BT_maxbits;    // page offset for key start
194         uint type:3;                    // type of slot
195         uint dead:1;                    // set for deleted slot
196 } BtSlot;
197
198 //      The key structure occupies space at the upper end of
199 //      each page.  It's a length byte followed by the key
200 //      bytes.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char key[0];
205 } BtKey;
206
207 //      the value structure also occupies space at the upper
208 //      end of the page. Each key is immediately followed by a value.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char value[0];
213 } BtVal;
214
215 #define BT_maxkey       255             // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217
218 //      The first part of an index page.
219 //      It is immediately followed
220 //      by the BtSlot array of keys.
221
222 typedef struct BtPage_ {
223         uint cnt;                                       // count of keys in page
224         uint act;                                       // count of active keys
225         uint min;                                       // next key offset
226         uint garbage;                           // page garbage in bytes
227         unsigned char lvl;                      // level of page
228         unsigned char free;                     // page is on free chain
229         unsigned char kill;                     // page is being deleted
230         unsigned char nopromote;        // page is being constructed
231         unsigned char filler1[6];       // padding to multiple of 8 bytes
232         unsigned char right[BtId];      // page number to right
233         unsigned char left[BtId];       // page number to left
234         unsigned char filler2[2];       // padding to multiple of 8 bytes
235         logseqno lsn;                           // log sequence number applied
236         uid page_no;                            // this page number
237 } *BtPage;
238
239 //  The loadpage interface object
240
241 typedef struct {
242         BtPage page;            // current page pointer
243         BtLatchSet *latch;      // current page latch set
244 } BtPageSet;
245
246 //      structure for latch manager on ALLOC_page
247
248 typedef struct {
249         struct BtPage_ alloc[1];                // next page_no in right ptr
250         unsigned char freechain[BtId];  // head of free page_nos chain
251         unsigned char leafchain[BtId];  // head of leaf page_nos chain
252         unsigned long long leafpages;   // number of active leaf pages
253         unsigned long long upperpages;  // number of active upper pages
254         uint redopages;                                 // number of redo pages in file
255         unsigned char leaf_xtra;                // leaf page size in xtra bits
256         unsigned char page_bits;                // base page size in bits
257 } BtPageZero;
258
259 //      The object structure for Btree access
260
261 typedef struct {
262         uint page_size;                         // base page size       
263         uint page_bits;                         // base page size in bits       
264         uint leaf_xtra;                         // leaf xtra bits       
265 #ifdef unix
266         int idx;
267 #else
268         HANDLE idx;
269 #endif
270         BtPageZero *pagezero;           // mapped allocation page
271         BtHashEntry *hashtable;         // the buffer pool hash table entries
272         BtHashEntry *leaftable;         // the buffer pool hash table entries
273         BtLatchSet *latchsets;          // mapped latch set from buffer pool
274         BtLatchSet *leafsets;           // mapped latch set from buffer pool
275         unsigned char *pagepool;        // mapped to the buffer pool pages
276         unsigned char *leafpool;        // mapped to the leaf pool pages
277         unsigned char *redobuff;        // mapped recovery buffer pointer
278         logseqno lsn, flushlsn;         // current & first lsn flushed
279         MutexLatch redo[1];                     // redo area lite latch
280         MutexLatch lock[1];                     // allocation area lite latch
281         ushort thread_no[1];            // highest thread number issued
282         ushort err_thread;                      // error thread number
283         uint nlatchpage;                        // size of buffer pool & latchsets
284         uint latchtotal;                        // number of page latch entries
285         uint latchhash;                         // number of latch hash table slots
286         uint latchvictim;                       // next latch entry to swap out
287         uint nleafpage;                         // size of leaf pool & leafsets
288         uint leaftotal;                         // number of leaf latch entries
289         uint leafhash;                          // number of leaf hash table slots
290         uint leafvictim;                        // next leaf entry to swap out
291         uint leafpromote;                       // next leaf entry to promote
292         uint redopage;                          // page number of redo buffer
293         uint redolast;                          // last msync size of recovery buff
294         uint redoend;                           // eof/end element in recovery buff
295         int err;                                        // last error
296         int line;                                       // last error line no
297         int found;                                      // number of keys found by delete
298         int reads, writes;                      // number of reads and writes
299         int type;                                       // type of LSM tree 0=cache, 1=main
300 #ifndef unix
301         HANDLE halloc;                          // allocation handle
302         HANDLE hpool;                           // buffer pool handle
303 #endif
304         unsigned char *page0;           // memory mapped pagezero of b-tree
305 } BtMgr;
306
307 typedef struct {
308         BtMgr *mgr;                                     // buffer manager for entire process
309         BtMgr *main;                            // buffer manager for main btree
310         BtPageSet cacheset[1];  // cached page frame for cache btree
311         BtPageSet mainset[1];   // cached page frame for main btree
312         uint cacheslot;                         // slot number in cacheset
313         uint mainslot;                          // slot number in mainset
314         ushort phase;                           // 1 = main btree 0 = cache btree 2 = both
315         ushort thread_no;                       // thread number
316         BtSlot *cachenode;
317         BtSlot *mainnode;
318         BtKey *cachekey;
319         BtKey *mainkey;
320         BtVal *cacheval;
321         BtVal *mainval;
322 } BtDb;
323
324 //      atomic txn structure
325
326 typedef struct {
327         logseqno reqlsn;        // redo log seq no required
328         uint entry:31;          // latch table entry number
329         uint reuse:1;           // reused previous page
330         uint slot;                      // slot on page
331 } AtomicTxn;
332
333 //      Catastrophic errors
334
335 typedef enum {
336         BTERR_ok = 0,
337         BTERR_struct,
338         BTERR_ovflw,
339         BTERR_lock,
340         BTERR_map,
341         BTERR_read,
342         BTERR_wrt,
343         BTERR_atomic,
344         BTERR_recovery
345 } BTERR;
346
347 #define CLOCK_bit 0x8000
348
349 // recovery manager entry types
350
351 typedef enum {
352         BTRM_eof = 0,   // rest of buffer is emtpy
353         BTRM_add,               // add a unique key-value to btree
354         BTRM_dup,               // add a duplicate key-value to btree
355         BTRM_del,               // delete a key-value from btree
356         BTRM_upd,               // update a key with a new value
357         BTRM_new,               // allocate a new empty page
358         BTRM_old                // reuse an old empty page
359 } BTRM;
360
361 // recovery manager entry
362 //      structure followed by BtKey & BtVal
363
364 typedef struct {
365         logseqno reqlsn;        // log sequence number required
366         logseqno lsn;           // log sequence number for entry
367         uint len;                       // length of entry
368         unsigned char type;     // type of entry
369         unsigned char lvl;      // level of btree entry pertains to
370 } BtLogHdr;
371
372 // B-Tree functions
373
374 extern void bt_close (BtDb *bt);
375 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
376 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
377 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
378 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
379 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
380 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
381 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
382
383 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
384
385 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
386 extern BTERR bt_nextkey (BtDb *bt);
387
388 extern uint bt_lastkey (BtDb *bt);
389 extern uint bt_prevkey (BtDb *bt);
390
391 //      manager functions
392 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
393 extern void bt_mgrclose (BtMgr *mgr);
394 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
395 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
396
397 //      atomic transaction functions
398 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
399 BTERR bt_promote (BtDb *bt);
400
401 //  The page is allocated from low and hi ends.
402 //  The key slots are allocated from the bottom,
403 //      while the text and value of the key
404 //  are allocated from the top.  When the two
405 //  areas meet, the page is split into two.
406
407 //  A key consists of a length byte, two bytes of
408 //  index number (0 - 65535), and up to 253 bytes
409 //  of key value.
410
411 //  Associated with each key is a value byte string
412 //      containing any value desired.
413
414 //  The b-tree root is always located at page 1.
415 //      The first leaf page of level zero is always
416 //      located on page 2.
417
418 //      The b-tree pages are linked with next
419 //      pointers to facilitate enumerators,
420 //      and provide for concurrency.
421
422 //      When to root page fills, it is split in two and
423 //      the tree height is raised by a new root at page
424 //      one with two keys.
425
426 //      Deleted keys are marked with a dead bit until
427 //      page cleanup. The fence key for a leaf node is
428 //      always present
429
430 //  To achieve maximum concurrency one page is locked at a time
431 //  as the tree is traversed to find leaf key in question. The right
432 //  page numbers are used in cases where the page is being split,
433 //      or consolidated.
434
435 //  Page 0 is dedicated to lock for new page extensions,
436 //      and chains empty pages together for reuse. It also
437 //      contains the latch manager hash table.
438
439 //      The ParentModification lock on a node is obtained to serialize posting
440 //      or changing the fence key for a node.
441
442 //      Empty pages are chained together through the ALLOC page and reused.
443
444 //      Access macros to address slot and key values from the page
445 //      Page slots use 1 based indexing.
446
447 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
448 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
449 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
450
451 void bt_putid(unsigned char *dest, uid id)
452 {
453 int i = BtId;
454
455         while( i-- )
456                 dest[i] = (unsigned char)id, id >>= 8;
457 }
458
459 uid bt_getid(unsigned char *src)
460 {
461 uid id = 0;
462 int i;
463
464         for( i = 0; i < BtId; i++ )
465                 id <<= 8, id |= *src++; 
466
467         return id;
468 }
469
470 //      lite weight spin lock Latch Manager
471
472 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
473 {
474         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
475 }
476
477 void bt_mutexlock(MutexLatch *latch)
478 {
479 uint idx, waited = 0;
480 MutexLatch prev[1];
481
482  while( 1 ) {
483   for( idx = 0; idx < 100; idx++ ) {
484         *prev->value = __sync_fetch_and_or (latch->value, 1);
485         if( !*prev->bits->xcl ) {
486           if( waited )
487                 __sync_fetch_and_sub (latch->bits->waiters, 1);
488           return;
489         }
490   }
491
492   if( !waited ) {
493         __sync_fetch_and_add (latch->bits->waiters, 1);
494         *prev->bits->waiters += 1;
495         waited++;
496   }
497
498   sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
499  }
500 }
501
502 int bt_mutextry(MutexLatch *latch)
503 {
504         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
505 }
506
507 void bt_releasemutex(MutexLatch *latch)
508 {
509 MutexLatch prev[1];
510
511         *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
512
513         if( *prev->bits->waiters )
514                 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
515 }
516
517 //      reader/writer lock implementation
518
519 void WriteLock (RWLock *lock, ushort tid, uint line)
520 {
521         if( lock->tid == tid ) {
522                 lock->dup++;
523                 return;
524         }
525         bt_mutexlock (lock->xcl);
526         bt_mutexlock (lock->wrt);
527         bt_releasemutex (lock->xcl);
528
529         lock->line = line;
530         lock->tid = tid;
531 }
532
533 void WriteRelease (RWLock *lock)
534 {
535         if( lock->dup ) {
536                 lock->dup--;
537                 return;
538         }
539         lock->tid = 0;
540         bt_releasemutex (lock->wrt);
541 }
542
543 void ReadLock (RWLock *lock, ushort tid)
544 {
545         bt_mutexlock (lock->xcl);
546
547         if( !__sync_fetch_and_add (&lock->readers, 1) )
548                 bt_mutexlock (lock->wrt);
549
550         bt_releasemutex (lock->xcl);
551 }
552
553 void ReadRelease (RWLock *lock)
554 {
555         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
556                 bt_releasemutex (lock->wrt);
557 }
558
559 //      recovery manager -- flush dirty pages
560
561 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
562 {
563 uint cnt3 = 0, cnt2 = 0, cnt = 0;
564 uint entry, segment;
565 BtLatchSet *latch;
566 BtPage page;
567
568   //    flush dirty pool pages to the btree
569
570 fprintf(stderr, "Start flushlsn  ");
571   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
572         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
573         latch = mgr->latchsets + entry;
574         bt_mutexlock (latch->modify);
575         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
576
577         if( latch->dirty ) {
578           bt_writepage(mgr, page, latch->page_no, 0);
579           latch->dirty = 0, cnt++;
580         }
581 if( latch->pin & ~CLOCK_bit )
582 cnt2++;
583         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
584         bt_releasemutex (latch->modify);
585   }
586   for( entry = 1; entry < mgr->leaftotal; entry++ ) {
587         page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
588         latch = mgr->leafsets + entry;
589         bt_mutexlock (latch->modify);
590         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
591
592         if( latch->dirty ) {
593           bt_writepage(mgr, page, latch->page_no, 1);
594           latch->dirty = 0, cnt++;
595         }
596 if( latch->pin & ~CLOCK_bit )
597 cnt2++;
598         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
599         bt_releasemutex (latch->modify);
600   }
601 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
602 }
603
604 //      recovery manager -- process current recovery buff on startup
605 //      this won't do much if previous session was properly closed.
606
607 BTERR bt_recoveryredo (BtMgr *mgr)
608 {
609 BtLogHdr *hdr, *eof;
610 uint offset = 0;
611 BtKey *key;
612 BtVal *val;
613
614   hdr = (BtLogHdr *)mgr->redobuff;
615   mgr->flushlsn = hdr->lsn;
616
617   while( 1 ) {
618         hdr = (BtLogHdr *)(mgr->redobuff + offset);
619         switch( hdr->type ) {
620         case BTRM_eof:
621                 mgr->lsn = hdr->lsn;
622                 return 0;
623         case BTRM_add:          // add a unique key-value to btree
624         
625         case BTRM_dup:          // add a duplicate key-value to btree
626         case BTRM_del:          // delete a key-value from btree
627         case BTRM_upd:          // update a key with a new value
628         case BTRM_new:          // allocate a new empty page
629         case BTRM_old:          // reuse an old empty page
630                 return 0;
631         }
632   }
633 }
634
635 //      recovery manager -- append new entry to recovery log
636 //      flush dirty pages to disk when it overflows.
637
638 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
639 {
640 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
641 uint amt = sizeof(BtLogHdr);
642 BtLogHdr *hdr, *eof;
643 uint last, end;
644
645         bt_mutexlock (mgr->redo);
646
647         if( key )
648           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
649
650         //      see if new entry fits in buffer
651         //      flush and reset if it doesn't
652
653         if( amt > size - mgr->redoend ) {
654           mgr->flushlsn = mgr->lsn;
655           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
656                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
657           mgr->redolast = 0;
658           mgr->redoend = 0;
659           bt_flushlsn(mgr, thread_no);
660         }
661
662         //      fill in new entry & either eof or end block
663
664         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
665
666         hdr->len = amt;
667         hdr->type = type;
668         hdr->lvl = lvl;
669         hdr->lsn = ++mgr->lsn;
670
671         mgr->redoend += amt;
672
673         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
674         memset (eof, 0, sizeof(BtLogHdr));
675
676         //  fill in key and value
677
678         if( key ) {
679           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
680           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
681         }
682
683         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
684         memset (eof, 0, sizeof(BtLogHdr));
685         eof->lsn = mgr->lsn;
686
687         last = mgr->redolast & ~0xfff;
688         end = mgr->redoend;
689
690         if( end - last + sizeof(BtLogHdr) >= 32768 )
691           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
692                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
693           else
694                 mgr->redolast = end;
695
696         bt_releasemutex(mgr->redo);
697         return hdr->lsn;
698 }
699
700 //      recovery manager -- append transaction to recovery log
701 //      flush dirty pages to disk when it overflows.
702
703 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
704 {
705 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
706 uint amt = 0, src, type;
707 BtLogHdr *hdr, *eof;
708 uint last, end;
709 logseqno lsn;
710 BtKey *key;
711 BtVal *val;
712
713         //      determine amount of redo recovery log space required
714
715         for( src = 0; src++ < source->cnt; ) {
716           key = keyptr(source,src);
717           val = valptr(source,src);
718           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
719           amt += sizeof(BtLogHdr);
720         }
721
722         bt_mutexlock (mgr->redo);
723
724         //      see if new entry fits in buffer
725         //      flush and reset if it doesn't
726
727         if( amt > size - mgr->redoend ) {
728           mgr->flushlsn = mgr->lsn;
729           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
730                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
731           mgr->redolast = 0;
732           mgr->redoend = 0;
733           bt_flushlsn (mgr, thread_no);
734         }
735
736         //      assign new lsn to transaction
737
738         lsn = ++mgr->lsn;
739
740         //      fill in new entries
741
742         for( src = 0; src++ < source->cnt; ) {
743           key = keyptr(source, src);
744           val = valptr(source, src);
745
746           switch( slotptr(source, src)->type ) {
747           case Unique:
748                 type = BTRM_add;
749                 break;
750           case Duplicate:
751                 type = BTRM_dup;
752                 break;
753           case Delete:
754                 type = BTRM_del;
755                 break;
756           }
757
758           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
759           amt += sizeof(BtLogHdr);
760
761           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
762           hdr->len = amt;
763           hdr->type = type;
764           hdr->lsn = lsn;
765           hdr->lvl = 0;
766
767           //  fill in key and value
768
769           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
770           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
771
772           mgr->redoend += amt;
773         }
774
775         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776         memset (eof, 0, sizeof(BtLogHdr));
777         eof->lsn = lsn;
778
779         last = mgr->redolast & ~0xfff;
780         end = mgr->redoend;
781
782         if( end - last + sizeof(BtLogHdr) >= 32768 )
783           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
784                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
785           else
786                 mgr->redolast = end;
787
788         bt_releasemutex(mgr->redo);
789         return lsn;
790 }
791
792 //      read page into buffer pool from permanent location in Btree file
793
794 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
795 {
796 uint page_size = mgr->page_size;
797
798   if( leaf )
799         page_size <<= mgr->leaf_xtra;
800
801   if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
802         return mgr->err = BTERR_read;
803
804   __sync_fetch_and_add (&mgr->reads, 1);
805   return 0;
806 }
807
808 //      write page to permanent location in Btree file
809
810 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
811 {
812 uint page_size = mgr->page_size;
813
814   if( leaf )
815         page_size <<= mgr->leaf_xtra;
816
817   if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
818         return mgr->err = BTERR_wrt;
819
820   __sync_fetch_and_add (&mgr->writes, 1);
821   return 0;
822 }
823
824 //      set CLOCK bit in latch
825 //      decrement pin count
826
827 void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
828 {
829         bt_mutexlock(latch->modify);
830         if( dirty )
831                 latch->dirty = 1;
832         latch->pin |= CLOCK_bit;
833         latch->pin--;
834         bt_releasemutex(latch->modify);
835 }
836
837 //  return the btree cached page address
838
839 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
840 {
841 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
842 BtPage page;
843
844   if( latch->leaf )
845         page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
846   else
847         page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
848
849   return page;
850 }
851
852 //  return next available leaf entry
853 //        and with latch entry locked
854
855 uint bt_availleaf (BtMgr *mgr)
856 {
857 BtLatchSet *latch;
858 uint entry;
859
860   while( 1 ) {
861 #ifdef unix
862         entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
863 #else
864         entry = _InterlockedIncrement (&mgr->leafvictim);
865 #endif
866         entry %= mgr->leaftotal;
867
868         if( !entry )
869                 continue;
870
871         latch = mgr->leafsets + entry;
872
873         if( !bt_mutextry(latch->modify) )
874                 continue;
875
876         //  return this entry if it is not pinned
877
878         if( !latch->pin )
879                 return entry;
880
881         //      if the CLOCK bit is set
882         //      reset it to zero.
883
884         latch->pin &= ~CLOCK_bit;
885         bt_releasemutex(latch->modify);
886   }
887 }
888
889 //  return next available latch entry
890 //        and with latch entry locked
891
892 uint bt_availnext (BtMgr *mgr)
893 {
894 BtLatchSet *latch;
895 uint entry;
896
897   while( 1 ) {
898 #ifdef unix
899         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
900 #else
901         entry = _InterlockedIncrement (&mgr->latchvictim);
902 #endif
903         entry %= mgr->latchtotal;
904
905         if( !entry )
906                 continue;
907
908         latch = mgr->latchsets + entry;
909
910         if( !bt_mutextry(latch->modify) )
911                 continue;
912
913         //  return this entry if it is not pinned
914
915         if( !latch->pin )
916                 return entry;
917
918         //      if the CLOCK bit is set
919         //      reset it to zero.
920
921         latch->pin &= ~CLOCK_bit;
922         bt_releasemutex(latch->modify);
923   }
924 }
925
926 //      pin leaf in leaf buffer pool
927 //      return with latchset pinned
928
929 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
930 {
931 uint hashidx = page_no % mgr->leafhash;
932 uint entry, oldidx;
933 BtLatchSet *latch;
934 BtPage page;
935
936   //  try to find our entry
937
938   bt_mutexlock(mgr->leaftable[hashidx].latch);
939
940   if( entry = mgr->leaftable[hashidx].entry )
941    do {
942         latch = mgr->leafsets + entry;
943
944         if( page_no == latch->page_no )
945                 break;
946    } while( entry = latch->next );
947
948   //  found our entry: increment pin
949
950   if( entry ) {
951         latch = mgr->leafsets + entry;
952         bt_mutexlock(latch->modify);
953         latch->pin |= CLOCK_bit;
954         latch->pin++;
955
956         bt_releasemutex(latch->modify);
957         bt_releasemutex(mgr->leaftable[hashidx].latch);
958         return latch;
959   }
960
961   //  find and reuse unpinned entry
962
963 trynext:
964   entry = bt_availleaf (mgr);
965   latch = mgr->leafsets + entry;
966
967   page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
968   oldidx = latch->page_no % mgr->leafhash;
969
970   //  skip over this entry if old latch is not available
971
972   if( latch->page_no )
973    if( oldidx != hashidx )
974     if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
975           bt_releasemutex(latch->modify);
976           goto trynext;
977         }
978
979   //  update permanent page area in btree from buffer pool
980   //    no read-lock is required since page is not pinned.
981
982   if( latch->dirty )
983         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
984           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
985         else
986           latch->dirty = 0;
987
988   //  if latch is on a different hash chain
989   //    unlink from the old page_no chain
990
991   if( latch->page_no )
992    if( oldidx != hashidx ) {
993         if( latch->prev )
994           mgr->leafsets[latch->prev].next = latch->next;
995         else
996           mgr->leaftable[oldidx].entry = latch->next;
997
998         if( latch->next )
999           mgr->leafsets[latch->next].prev = latch->prev;
1000
1001     bt_releasemutex (mgr->leaftable[oldidx].latch);
1002    }
1003
1004   if( bt_readpage (mgr, page, page_no, 1) )
1005         return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1006
1007   //  link page as head of hash table chain
1008   //  if this is a never before used entry,
1009   //  or it was previously on a different
1010   //  hash table chain. Otherwise, just
1011   //  leave it in its current hash table
1012   //  chain position.
1013
1014   if( !latch->page_no || hashidx != oldidx ) {
1015         if( latch->next = mgr->leaftable[hashidx].entry )
1016           mgr->leafsets[latch->next].prev = entry;
1017
1018         mgr->leaftable[hashidx].entry = entry;
1019     latch->prev = 0;
1020   }
1021
1022   //  fill in latch structure
1023
1024   latch->pin = CLOCK_bit | 1;
1025   latch->page_no = page_no;
1026   latch->leaf = 1;
1027
1028   bt_releasemutex (latch->modify);
1029   bt_releasemutex (mgr->leaftable[hashidx].latch);
1030   return latch;
1031 }
1032
1033 //      pin page in non-leaf buffer pool
1034 //      return with latchset pinned
1035
1036 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1037 {
1038 uint hashidx = page_no % mgr->latchhash;
1039 uint entry, oldidx;
1040 BtLatchSet *latch;
1041 BtPage page;
1042
1043   //  try to find our entry
1044
1045   bt_mutexlock(mgr->hashtable[hashidx].latch);
1046
1047   if( entry = mgr->hashtable[hashidx].entry ) do
1048   {
1049         latch = mgr->latchsets + entry;
1050
1051         if( page_no == latch->page_no )
1052                 break;
1053   } while( entry = latch->next );
1054
1055   //  found our entry: increment pin
1056
1057   if( entry ) {
1058         latch = mgr->latchsets + entry;
1059         bt_mutexlock(latch->modify);
1060         latch->pin |= CLOCK_bit;
1061         latch->pin++;
1062         bt_releasemutex(latch->modify);
1063         bt_releasemutex(mgr->hashtable[hashidx].latch);
1064         return latch;
1065   }
1066
1067   //  find and reuse unpinned entry
1068
1069 trynext:
1070   entry = bt_availnext (mgr);
1071   latch = mgr->latchsets + entry;
1072
1073   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1074   oldidx = latch->page_no % mgr->latchhash;
1075
1076   //  skip over this entry if latch not available
1077
1078   if( latch->page_no )
1079    if( oldidx != hashidx )
1080     if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
1081           bt_releasemutex(latch->modify);
1082           goto trynext;
1083         }
1084
1085   //  update permanent page area in btree from buffer pool
1086   //    no read-lock is required since page is not pinned.
1087
1088   if( latch->dirty )
1089         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1090           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1091         else
1092           latch->dirty = 0;
1093
1094   //  if latch is on a different hash chain
1095   //    unlink from the old page_no chain
1096
1097   if( latch->page_no )
1098    if( oldidx != hashidx ) {
1099         if( latch->prev )
1100           mgr->latchsets[latch->prev].next = latch->next;
1101         else
1102           mgr->hashtable[oldidx].entry = latch->next;
1103
1104         if( latch->next )
1105           mgr->latchsets[latch->next].prev = latch->prev;
1106
1107     bt_releasemutex (mgr->hashtable[oldidx].latch);
1108    }
1109
1110   if( bt_readpage (mgr, page, page_no, 0) )
1111         return mgr->line = __LINE__, NULL;
1112
1113   //  link page as head of hash table chain
1114   //  if this is a never before used entry,
1115   //  or it was previously on a different
1116   //  hash table chain. Otherwise, just
1117   //  leave it in its current hash table
1118   //  chain position.
1119
1120   if( !latch->page_no || hashidx != oldidx ) {
1121         if( latch->next = mgr->hashtable[hashidx].entry )
1122           mgr->latchsets[latch->next].prev = entry;
1123
1124         mgr->hashtable[hashidx].entry = entry;
1125     latch->prev = 0;
1126   }
1127
1128   //  fill in latch structure
1129
1130   latch->pin = CLOCK_bit | 1;
1131   latch->page_no = page_no;
1132   latch->leaf = 0;
1133
1134   bt_releasemutex (latch->modify);
1135   bt_releasemutex (mgr->hashtable[hashidx].latch);
1136   return latch;
1137 }
1138   
1139 void bt_mgrclose (BtMgr *mgr)
1140 {
1141 char *name = mgr->type ? "Main" : "Cache";
1142 BtLatchSet *latch;
1143 BtLogHdr *eof;
1144 uint num = 0;
1145 BtPage page;
1146 uint entry;
1147
1148         //      flush previously written dirty pages
1149         //      and write recovery buffer to disk
1150
1151         fdatasync (mgr->idx);
1152
1153         if( mgr->redoend ) {
1154                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1155                 memset (eof, 0, sizeof(BtLogHdr));
1156         }
1157
1158         //      write remaining dirty pool pages to the btree
1159
1160         for( entry = 1; entry < mgr->latchtotal; entry++ ) {
1161                 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1162                 latch = mgr->latchsets + entry;
1163
1164                 if( latch->dirty ) {
1165                         bt_writepage(mgr, page, latch->page_no, 0);
1166                         latch->dirty = 0, num++;
1167                 }
1168         }
1169
1170         if( num )
1171           fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
1172
1173         num = 0;
1174
1175         //      write remaining dirty leaf pages to the btree
1176
1177         for( entry = 1; entry < mgr->leaftotal; entry++ ) {
1178                 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1179                 latch = mgr->leafsets + entry;
1180
1181                 if( latch->dirty ) {
1182                         bt_writepage(mgr, page, latch->page_no, 1);
1183                         latch->dirty = 0, num++;
1184                 }
1185         }
1186
1187         if( num )
1188           fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
1189
1190         //      clear redo recovery buffer on disk.
1191
1192         if( mgr->pagezero->redopages ) {
1193                 eof = (BtLogHdr *)mgr->redobuff;
1194                 memset (eof, 0, sizeof(BtLogHdr));
1195                 eof->lsn = mgr->lsn;
1196                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1197                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1198         }
1199
1200 #ifdef unix
1201         munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
1202
1203         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1204         munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1205         munmap (mgr->pagezero, mgr->page_size);
1206 #else
1207         FlushViewOfFile(mgr->pagezero, 0);
1208         UnmapViewOfFile(mgr->pagezero);
1209         UnmapViewOfFile(mgr->pagepool);
1210         CloseHandle(mgr->halloc);
1211         CloseHandle(mgr->hpool);
1212 #endif
1213 #ifdef unix
1214         close (mgr->idx);
1215         free (mgr);
1216 #else
1217         FlushFileBuffers(mgr->idx);
1218         CloseHandle(mgr->idx);
1219         GlobalFree (mgr);
1220 #endif
1221 }
1222
1223 //      close and release memory
1224
1225 void bt_close (BtDb *bt)
1226 {
1227         free (bt);
1228 }
1229
1230 //  open/create new btree buffer manager
1231
1232 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1233 //              size of page pool (e.g. 300) and leaf pool (e.g. 4000)
1234
1235 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1236 {
1237 uint lvl, attr, last, slot, idx;
1238 uint nlatchpage, latchhash;
1239 uint nleafpage, leafhash;
1240 unsigned char value[BtId];
1241 int flag, initit = 0;
1242 BtPageZero *pagezero;
1243 BtLatchSet *latch;
1244 off64_t size;
1245 uint amt[1];
1246 BtMgr* mgr;
1247 BtKey* key;
1248 BtVal *val;
1249
1250         // determine sanity of page size and buffer pool
1251
1252         if( leafxtra + pagebits > BT_maxbits )
1253                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1254
1255         if( pagebits < BT_minbits )
1256                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1257
1258 #ifdef unix
1259         mgr = calloc (1, sizeof(BtMgr));
1260
1261         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1262
1263         if( mgr->idx == -1 ) {
1264                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1265                 return free(mgr), NULL;
1266         }
1267 #else
1268         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1269         attr = FILE_ATTRIBUTE_NORMAL;
1270         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1271
1272         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1273                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1274                 return GlobalFree(mgr), NULL;
1275         }
1276 #endif
1277
1278 #ifdef unix
1279         pagezero = valloc (BT_maxpage);
1280         *amt = 0;
1281
1282         // read minimum page size to get root info
1283         //      to support raw disk partition files
1284         //      check if page_bits == 0 on the disk.
1285
1286         if( size = lseek (mgr->idx, 0L, 2) )
1287                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1288                         if( pagezero->page_bits ) {
1289                                 pagebits = pagezero->page_bits;
1290                                 leafxtra = pagezero->leaf_xtra;
1291                                 redopages = pagezero->redopages;
1292                         } else
1293                                 initit = 1;
1294                 else
1295                         return free(mgr), free(pagezero), NULL;
1296         else
1297                 initit = 1;
1298 #else
1299         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1300         size = GetFileSize(mgr->idx, amt);
1301
1302         if( size || *amt ) {
1303                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1304                         return bt_mgrclose (mgr), NULL;
1305                 pagebits = pagezero->page_bits;
1306                 leafxtra = pagezero->leaf_xtra;
1307                 redopages = pagezero->redopages;
1308         } else
1309                 initit = 1;
1310 #endif
1311
1312         mgr->page_size = 1 << pagebits;
1313         mgr->page_bits = pagebits;
1314         mgr->leaf_xtra = leafxtra;
1315
1316         //  calculate number of latch hash table entries
1317
1318         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1319
1320         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1321         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1322         mgr->latchtotal = nodemax;
1323
1324         //  calculate number of leaf hash table entries
1325
1326         mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1327
1328         mgr->nleafpage += leafmax << leafxtra;          // size of the leaf pool in pages
1329         mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1330         mgr->leaftotal = leafmax;
1331
1332         mgr->redopage = LEAF_page + (1 << leafxtra);
1333
1334         if( !initit )
1335                 goto mgrlatch;
1336
1337         // initialize an empty b-tree with latch page, root page, page of leaves
1338         // and page(s) of latches and page pool cache
1339
1340         ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1341         memset (pagezero, 0, 1 << pagebits);
1342         pagezero->alloc->lvl = MIN_lvl - 1;
1343         pagezero->redopages = redopages;
1344         pagezero->page_bits = mgr->page_bits;
1345         pagezero->leaf_xtra = leafxtra;
1346
1347         bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1348         pagezero->upperpages = 1;
1349         pagezero->leafpages = 1;
1350
1351         //  initialize left-most LEAF page in
1352         //      alloc->left and count of active leaf pages.
1353
1354         bt_putid (pagezero->alloc->left, LEAF_page);
1355
1356         if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1357                 fprintf (stderr, "Unable to create btree page zero\n");
1358                 return bt_mgrclose (mgr), NULL;
1359         }
1360
1361         memset (pagezero, 0, 1 << pagebits);
1362
1363         for( lvl=MIN_lvl; lvl--; ) {
1364         BtSlot *node = slotptr(pagezero->alloc, 1);
1365                 node->off = mgr->page_size;
1366
1367                 if( !lvl )
1368                         node->off <<= mgr->leaf_xtra;
1369
1370                 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1371                 node->type = Librarian;
1372                 node++->dead = 1;
1373
1374                 node->off = node[-1].off;
1375                 key = keyptr(pagezero->alloc, 2);
1376                 key = keyptr(pagezero->alloc, 1);
1377                 key->len = 2;           // create stopper key
1378                 key->key[0] = 0xff;
1379                 key->key[1] = 0xff;
1380
1381                 bt_putid(value, MIN_lvl - lvl + 1);
1382                 val = valptr(pagezero->alloc, 1);
1383                 val->len = lvl ? BtId : 0;
1384                 memcpy (val->value, value, val->len);
1385
1386                 pagezero->alloc->min = node->off;
1387                 pagezero->alloc->lvl = lvl;
1388                 pagezero->alloc->cnt = 2;
1389                 pagezero->alloc->act = 1;
1390                 pagezero->alloc->page_no = MIN_lvl - lvl;
1391
1392                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1393                         fprintf (stderr, "Unable to create btree page\n");
1394                         return bt_mgrclose (mgr), NULL;
1395                 }
1396         }
1397
1398 mgrlatch:
1399 #ifdef unix
1400         free (pagezero);
1401 #else
1402         VirtualFree (pagezero, 0, MEM_RELEASE);
1403 #endif
1404
1405         // mlock the first segment of 64K pages
1406
1407         flag = PROT_READ | PROT_WRITE;
1408         mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1409
1410         if( mgr->page0 == MAP_FAILED ) {
1411                 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
1412                 return bt_mgrclose (mgr), NULL;
1413         }
1414
1415         mgr->pagezero = (BtPageZero *)mgr->page0;
1416         mlock (mgr->pagezero, mgr->page_size);
1417
1418         mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
1419         mlock (mgr->redobuff, redopages << mgr->page_bits);
1420
1421         //      allocate pool buffers
1422
1423         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1424         if( mgr->pagepool == MAP_FAILED ) {
1425                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1426                 return bt_mgrclose (mgr), NULL;
1427         }
1428
1429         mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1430         if( mgr->leafpool == MAP_FAILED ) {
1431                 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1432                 return bt_mgrclose (mgr), NULL;
1433         }
1434
1435         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1436         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1437         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1438
1439         mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1440         mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1441         mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1442
1443         for( idx = 1; idx < mgr->leaftotal; idx++ ) {
1444                 latch = mgr->leafsets + idx;
1445                 latch->leaf = 1;
1446         }
1447
1448         return mgr;
1449 }
1450
1451 //      open BTree access method
1452 //      based on buffer manager
1453
1454 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1455 {
1456 BtDb *bt = malloc (sizeof(*bt));
1457
1458         memset (bt, 0, sizeof(*bt));
1459         bt->main = main;
1460         bt->mgr = mgr;
1461 #ifdef unix
1462         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1463 #else
1464         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1465 #endif
1466         return bt;
1467 }
1468
1469 //  compare two keys, return > 0, = 0, or < 0
1470 //  =0: keys are same
1471 //  -1: key2 > key1
1472 //  +1: key2 < key1
1473 //  as the comparison value
1474
1475 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1476 {
1477 uint len1 = key1->len;
1478 int ans;
1479
1480         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1481                 return ans;
1482
1483         if( len1 > len2 )
1484                 return 1;
1485         if( len1 < len2 )
1486                 return -1;
1487
1488         return 0;
1489 }
1490
1491 // place write, read, or parent lock on requested page_no.
1492
1493 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1494 {
1495         switch( mode ) {
1496         case BtLockRead:
1497                 ReadLock (latch->readwr, thread_no);
1498                 break;
1499         case BtLockWrite:
1500                 WriteLock (latch->readwr, thread_no, line);
1501                 break;
1502         case BtLockAccess:
1503                 ReadLock (latch->access, thread_no);
1504                 break;
1505         case BtLockDelete:
1506                 WriteLock (latch->access, thread_no, line);
1507                 break;
1508         case BtLockParent:
1509                 WriteLock (latch->parent, thread_no, line);
1510                 break;
1511         case BtLockLink:
1512                 WriteLock (latch->link, thread_no, line);
1513                 break;
1514         }
1515 }
1516
1517 // remove write, read, or parent lock on requested page
1518
1519 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1520 {
1521         switch( mode ) {
1522         case BtLockRead:
1523                 ReadRelease (latch->readwr);
1524                 break;
1525         case BtLockWrite:
1526                 WriteRelease (latch->readwr);
1527                 break;
1528         case BtLockAccess:
1529                 ReadRelease (latch->access);
1530                 break;
1531         case BtLockDelete:
1532                 WriteRelease (latch->access);
1533                 break;
1534         case BtLockParent:
1535                 WriteRelease (latch->parent);
1536                 break;
1537         case BtLockLink:
1538                 WriteRelease (latch->link);
1539                 break;
1540         }
1541 }
1542
1543 //      allocate a new page
1544 //      return with page latched, but unlocked.
1545
1546 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1547 {
1548 uint page_size = mgr->page_size, leaf_xtra = 0;
1549 unsigned char *freechain;
1550 uid page_no;
1551
1552         if( contents->lvl ) {
1553                 freechain = mgr->pagezero->freechain;
1554                 mgr->pagezero->upperpages++;
1555         } else {
1556                 freechain = mgr->pagezero->leafchain;
1557                 mgr->pagezero->leafpages++;
1558                 leaf_xtra = mgr->leaf_xtra;
1559                 page_size <<= leaf_xtra;
1560         }
1561
1562         //      lock allocation page
1563
1564         bt_mutexlock(mgr->lock);
1565
1566         // use empty chain first
1567         // else allocate new page
1568
1569         if( page_no = bt_getid(freechain) ) {
1570                 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1571                         set->page = bt_mappage (mgr, set->latch);
1572                 else
1573                         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1574
1575                 bt_putid(freechain, bt_getid(set->page->right));
1576
1577                 //  the page is currently nopromote and this
1578                 //  will keep bt_promote out.
1579
1580                 //      contents will replace this bit
1581                 //  and pin will keep bt_promote out
1582
1583                 contents->page_no = page_no;
1584                 contents->nopromote = 0;
1585
1586                 memcpy (set->page, contents, page_size);
1587
1588 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1589 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1590
1591                 bt_releasemutex(mgr->lock);
1592                 return 0;
1593         }
1594
1595         page_no = bt_getid(mgr->pagezero->alloc->right);
1596         bt_putid(mgr->pagezero->alloc->right, page_no+(1 << leaf_xtra));
1597
1598         // unlock allocation latch and
1599         //      extend file into new page.
1600
1601 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1602 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1603
1604         bt_releasemutex(mgr->lock);
1605
1606         //      keep bt_promote out of this page
1607         contents->nopromote = 1;
1608         contents->page_no = page_no;
1609
1610         if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1611                 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1612
1613         if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1614                 set->page = bt_mappage (mgr, set->latch);
1615         else
1616                 return mgr->err_thread = thread_no, mgr->err;
1617
1618         // now pin will keep bt_promote out
1619
1620         set->page->nopromote = 0;
1621         return 0;
1622 }
1623
1624 //  find slot in page for given key at a given level
1625
1626 int bt_findslot (BtPage page, unsigned char *key, uint len)
1627 {
1628 uint diff, higher = page->cnt, low = 1, slot;
1629 uint good = 0;
1630
1631         //        make stopper key an infinite fence value
1632
1633         if( bt_getid (page->right) )
1634                 higher++;
1635         else
1636                 good++;
1637
1638         //      low is the lowest candidate.
1639         //  loop ends when they meet
1640
1641         //  higher is already
1642         //      tested as .ge. the passed key.
1643
1644         while( diff = higher - low ) {
1645                 slot = low + ( diff >> 1 );
1646                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1647                         low = slot + 1;
1648                 else
1649                         higher = slot, good++;
1650         }
1651
1652         //      return zero if key is on right link page
1653
1654         return good ? higher : 0;
1655 }
1656
1657 //  find and load page at given level for given key
1658 //      leave page rd or wr locked as requested
1659
1660 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1661 {
1662 uid page_no = ROOT_page, prevpage_no = 0;
1663 uint drill = 0xff, slot;
1664 BtLatchSet *prevlatch;
1665 uint mode, prevmode;
1666 BtPage prevpage;
1667 BtVal *val;
1668 BtKey *ptr;
1669
1670   //  start at root of btree and drill down
1671
1672   do {
1673         // determine lock mode of drill level
1674         mode = (drill == lvl) ? lock : BtLockRead; 
1675
1676         if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1677           set->page = bt_mappage (mgr, set->latch);
1678         else
1679           return 0;
1680
1681         // obtain access lock using lock chaining with Access mode
1682
1683         if( page_no > ROOT_page )
1684           bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1685
1686         //      release & unpin parent or left sibling page
1687
1688         if( prevpage_no ) {
1689           bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1690           bt_unpinlatch (prevlatch, 0, thread_no, __LINE__);
1691           prevpage_no = 0;
1692         }
1693
1694         // obtain mode lock using lock coupling through AccessLock
1695
1696         bt_lockpage(mode, set->latch, thread_no, __LINE__);
1697
1698         // grab our fence key
1699
1700         ptr=keyptr(set->page,set->page->cnt);
1701
1702         if( set->page->free )
1703                 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1704
1705         if( page_no > ROOT_page )
1706           bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1707
1708         // re-read and re-lock root after determining actual level of root
1709
1710         if( set->page->lvl != drill) {
1711                 if( set->latch->page_no != ROOT_page )
1712                         return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1713                         
1714                 drill = set->page->lvl;
1715
1716                 if( lock != BtLockRead && drill == lvl ) {
1717                   bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1718                   bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
1719                   continue;
1720                 }
1721         }
1722
1723         prevpage_no = set->latch->page_no;
1724         prevlatch = set->latch;
1725         prevpage = set->page;
1726         prevmode = mode;
1727
1728         //  if requested key is beyond our fence,
1729         //      slide to the right
1730
1731         if( keycmp (ptr, key, len) < 0 )
1732           if( page_no = bt_getid(set->page->right) )
1733                 continue;
1734
1735         //  if page is part of a delete operation,
1736         //      slide to the left;
1737
1738         if( set->page->kill ) {
1739           bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1740           page_no = bt_getid(set->page->left);
1741           bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1742           continue;
1743         }
1744
1745         //  find key on page at this level
1746         //  and descend to requested level
1747
1748         if( slot = bt_findslot (set->page, key, len) ) {
1749           if( drill == lvl )
1750                 return slot;
1751
1752           // find next non-dead slot -- the fence key if nothing else
1753
1754           while( slotptr(set->page, slot)->dead )
1755                 if( slot++ < set->page->cnt )
1756                   continue;
1757                 else
1758                   return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1759
1760           val = valptr(set->page, slot);
1761
1762           if( val->len == BtId )
1763                 page_no = bt_getid(val->value);
1764           else
1765                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1766
1767           drill--;
1768           continue;
1769          }
1770
1771         //  slide right into next page
1772
1773         page_no = bt_getid(set->page->right);
1774
1775   } while( page_no );
1776
1777   // return error on end of right chain
1778
1779   mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1780   return 0;     // return error
1781 }
1782
1783 //      return page to free list
1784 //      page must be delete & write locked
1785 //      and have no keys pointing to it.
1786
1787 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1788 {
1789 unsigned char *freechain;
1790
1791         if( set->page->lvl ) {
1792                 freechain = mgr->pagezero->freechain;
1793                 mgr->pagezero->upperpages--;
1794         } else {
1795                 freechain = mgr->pagezero->leafchain;
1796                 mgr->pagezero->leafpages--;
1797         }
1798
1799         //      lock allocation page
1800
1801         bt_mutexlock (mgr->lock);
1802
1803         //      store chain
1804
1805         memcpy(set->page->right, freechain, BtId);
1806         bt_putid(freechain, set->latch->page_no);
1807         set->page->free = 1;
1808
1809 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1810 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1811
1812         // unlock released page
1813         // and unlock allocation page
1814
1815         bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1816         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1817         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1818         bt_releasemutex (mgr->lock);
1819 }
1820
1821 //      a fence key was deleted from a page
1822 //      push new fence value upwards
1823
1824 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1825 {
1826 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1827 unsigned char value[BtId];
1828 BtKey* ptr;
1829 uint idx;
1830
1831         //      remove the old fence value
1832
1833         ptr = keyptr(set->page, set->page->cnt);
1834         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1835         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1836
1837         //  cache new fence value
1838
1839         ptr = keyptr(set->page, set->page->cnt);
1840         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1841
1842         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1843         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1844
1845         //      insert new (now smaller) fence key
1846
1847         bt_putid (value, set->latch->page_no);
1848         ptr = (BtKey*)leftkey;
1849
1850         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1851           return mgr->err_thread = thread_no, mgr->err;
1852
1853         //      now delete old fence key
1854
1855         ptr = (BtKey*)rightkey;
1856
1857         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1858                 return mgr->err_thread = thread_no, mgr->err;
1859
1860         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1861         bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
1862         return 0;
1863 }
1864
1865 //      root has a single child
1866 //      collapse a level from the tree
1867
1868 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1869 {
1870 BtPageSet child[1];
1871 uid page_no;
1872 BtVal *val;
1873 uint idx;
1874
1875   // find the child entry and promote as new root contents
1876
1877   do {
1878         for( idx = 0; idx++ < root->page->cnt; )
1879           if( !slotptr(root->page, idx)->dead )
1880                 break;
1881
1882         val = valptr(root->page, idx);
1883
1884         if( val->len == BtId )
1885                 page_no = bt_getid (valptr(root->page, idx)->value);
1886         else
1887                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1888
1889         if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1890                 child->page = bt_mappage (mgr, child->latch);
1891         else
1892                 return mgr->err_thread = thread_no, mgr->err;
1893
1894         bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1895         bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1896
1897         memcpy (root->page, child->page, mgr->page_size);
1898         bt_freepage (mgr, child, thread_no);
1899
1900   } while( root->page->lvl > 1 && root->page->act == 1 );
1901
1902   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1903   bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
1904   return 0;
1905 }
1906
1907 //  delete a page and manage key
1908 //  call with page writelocked
1909
1910 //      returns with page unpinned
1911 //      from the page pool.
1912
1913 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1914 {
1915 unsigned char lowerfence[BT_keyarray];
1916 uint page_size = mgr->page_size;
1917 BtPageSet right[1], temp[1];
1918 unsigned char value[BtId];
1919 uid page_no, right2;
1920 BtKey *ptr;
1921
1922         if( !lvl )
1923                 page_size <<= mgr->leaf_xtra;
1924
1925         //  cache copy of original fence key
1926         //      that is being deleted.
1927
1928         ptr = keyptr(set->page, set->page->cnt);
1929         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1930
1931         //      obtain locks on right page
1932
1933         page_no = bt_getid(set->page->right);
1934
1935         if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1936                 right->page = bt_mappage (mgr, right->latch);
1937         else
1938                 return 0;
1939
1940         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1941
1942         if( right->page->kill )
1943                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1944
1945         // pull contents of right peer into our empty page
1946         //      preserving our left page number, and its right page number.
1947
1948         bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
1949         page_no = bt_getid(set->page->left);
1950         memcpy (set->page, right->page, page_size);
1951         bt_putid (set->page->left, page_no);
1952         bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1953
1954         set->page->page_no = set->latch->page_no;
1955
1956         //  fix left link from far right page
1957
1958         if( right2 = bt_getid (right->page->right) ) {
1959           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
1960                 temp->page = bt_mappage (mgr, temp->latch);
1961           else
1962                 return 0;
1963
1964       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1965           bt_putid (temp->page->left, set->latch->page_no);
1966           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1967           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
1968         } else if( !lvl ) {     // our page is now rightmost leaf
1969           bt_mutexlock (mgr->lock);
1970           bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
1971           bt_releasemutex(mgr->lock);
1972         }
1973
1974         // mark right page deleted and release lock
1975
1976         right->page->kill = 1;
1977         bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1978
1979         // redirect higher key directly to our new node contents
1980
1981         ptr = keyptr(right->page, right->page->cnt);
1982         bt_putid (value, set->latch->page_no);
1983
1984         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
1985           return mgr->err;
1986
1987         //      delete our orignal fence key in parent
1988         //      and unlock our page.
1989
1990         ptr = (BtKey *)lowerfence;
1991
1992         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1993           return mgr->err;
1994
1995         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1996         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1997
1998         //      obtain delete and write locks to right node
1999
2000         bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2001         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2002         bt_freepage (mgr, right, thread_no);
2003         return 0;
2004 }
2005
2006 //  find and delete key on page by marking delete flag bit
2007 //  if page becomes empty, delete it from the btree
2008
2009 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2010 {
2011 uint slot, idx, found, fence;
2012 BtPageSet set[1];
2013 BtSlot *node;
2014 BtKey *ptr;
2015 BtVal *val;
2016
2017         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2018                 node = slotptr(set->page, slot);
2019                 ptr = keyptr(set->page, slot);
2020         } else
2021                 return mgr->err_thread = thread_no, mgr->err;
2022
2023         // if librarian slot, advance to real slot
2024
2025         if( node->type == Librarian ) {
2026                 ptr = keyptr(set->page, ++slot);
2027                 node = slotptr(set->page, slot);
2028         }
2029
2030         fence = slot == set->page->cnt;
2031
2032         // delete the key, ignore request if already dead
2033
2034         if( found = !keycmp (ptr, key, len) )
2035           if( found = node->dead == 0 ) {
2036                 val = valptr(set->page,slot);
2037                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2038                 set->page->act--;
2039                 node->dead = 1;
2040
2041                 // collapse empty slots beneath the fence
2042                 // on interiour nodes
2043
2044                 if( lvl )
2045                  while( idx = set->page->cnt - 1 )
2046                   if( slotptr(set->page, idx)->dead ) {
2047                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2048                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2049                   } else
2050                         break;
2051           }
2052
2053         if( !found )
2054                 return 0;
2055
2056         //      did we delete a fence key in an upper level?
2057
2058         if( lvl && set->page->act && fence )
2059           if( bt_fixfence (mgr, set, lvl, thread_no) )
2060                 return mgr->err_thread = thread_no, mgr->err;
2061           else
2062                 return 0;
2063
2064         //      do we need to collapse root?
2065
2066         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2067           if( bt_collapseroot (mgr, set, thread_no) )
2068                 return mgr->err_thread = thread_no, mgr->err;
2069           else
2070                 return 0;
2071
2072         //      delete empty page
2073
2074         if( !set->page->act )
2075           return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2076
2077         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2078         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2079         return 0;
2080 }
2081
2082 //      check page for space available,
2083 //      clean if necessary and return
2084 //      0 - page needs splitting
2085 //      >0  new slot value
2086
2087 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2088 {
2089 uint page_size = mgr->page_size;
2090 BtPage page = set->page, frame;
2091 uint cnt = 0, idx = 0;
2092 uint max = page->cnt;
2093 uint newslot = max;
2094 BtKey *key;
2095 BtVal *val;
2096
2097         if( !set->page->lvl )
2098                 page_size <<= mgr->leaf_xtra;
2099
2100         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2101                 return slot;
2102
2103         //      skip cleanup and proceed to split
2104         //      if there's not enough garbage
2105         //      to bother with.
2106
2107         if( page->garbage < page_size / 5 )
2108                 return 0;
2109
2110         frame = malloc (page_size);
2111         memcpy (frame, page, page_size);
2112
2113         // skip page info and set rest of page to zero
2114
2115         memset (page+1, 0, page_size - sizeof(*page));
2116
2117         page->min = page_size;
2118         page->garbage = 0;
2119         page->act = 0;
2120
2121         // clean up page first by
2122         // removing deleted keys
2123
2124         while( cnt++ < max ) {
2125                 if( cnt == slot )
2126                         newslot = idx + 2;
2127
2128                 if( cnt < max || frame->lvl )
2129                   if( slotptr(frame,cnt)->dead )
2130                         continue;
2131
2132                 // copy the value across
2133
2134                 val = valptr(frame, cnt);
2135                 page->min -= val->len + sizeof(BtVal);
2136                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2137
2138                 // copy the key across
2139
2140                 key = keyptr(frame, cnt);
2141                 page->min -= key->len + sizeof(BtKey);
2142                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2143
2144                 // make a librarian slot
2145
2146                 slotptr(page, ++idx)->off = page->min;
2147                 slotptr(page, idx)->type = Librarian;
2148                 slotptr(page, idx)->dead = 1;
2149
2150                 // set up the slot
2151
2152                 slotptr(page, ++idx)->off = page->min;
2153                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2154
2155                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2156                         page->act++;
2157         }
2158
2159         page->cnt = idx;
2160         free (frame);
2161
2162         //      see if page has enough space now, or does it need splitting?
2163
2164         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2165                 return newslot;
2166
2167         return 0;
2168 }
2169
2170 // split the root and raise the height of the btree
2171
2172 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2173 {  
2174 unsigned char leftkey[BT_keyarray];
2175 uint nxt = mgr->page_size;
2176 unsigned char value[BtId];
2177 BtPage frame, page;
2178 BtPageSet left[1];
2179 uid left_page_no;
2180 BtKey *ptr;
2181 BtVal *val;
2182
2183         frame = malloc (mgr->page_size);
2184         memcpy (frame, root->page, mgr->page_size);
2185
2186         //      save left page fence key for new root
2187
2188         ptr = keyptr(root->page, root->page->cnt);
2189         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2190
2191         //  Obtain an empty page to use, and copy the current
2192         //  root contents into it, e.g. lower keys
2193
2194         if( bt_newpage(mgr, left, frame, thread_no) )
2195                 return mgr->err_thread = thread_no, mgr->err;
2196
2197         left_page_no = left->latch->page_no;
2198         bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
2199         free (frame);
2200
2201         //      left link the pages together
2202
2203         page = bt_mappage (mgr, right);
2204         bt_putid (page->left, left_page_no);
2205
2206         // preserve the page info at the bottom
2207         // of higher keys and set rest to zero
2208
2209         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2210
2211         // insert stopper key at top of newroot page
2212         // and increase the root height
2213
2214         nxt -= BtId + sizeof(BtVal);
2215         bt_putid (value, right->page_no);
2216         val = (BtVal *)((unsigned char *)root->page + nxt);
2217         memcpy (val->value, value, BtId);
2218         val->len = BtId;
2219
2220         nxt -= 2 + sizeof(BtKey);
2221         slotptr(root->page, 2)->off = nxt;
2222         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2223         ptr->len = 2;
2224         ptr->key[0] = 0xff;
2225         ptr->key[1] = 0xff;
2226
2227         // insert lower keys page fence key on newroot page as first key
2228
2229         nxt -= BtId + sizeof(BtVal);
2230         bt_putid (value, left_page_no);
2231         val = (BtVal *)((unsigned char *)root->page + nxt);
2232         memcpy (val->value, value, BtId);
2233         val->len = BtId;
2234
2235         ptr = (BtKey *)leftkey;
2236         nxt -= ptr->len + sizeof(BtKey);
2237         slotptr(root->page, 1)->off = nxt;
2238         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2239         
2240         bt_putid(root->page->right, 0);
2241         root->page->min = nxt;          // reset lowest used offset and key count
2242         root->page->cnt = 2;
2243         root->page->act = 2;
2244         root->page->lvl++;
2245
2246         mgr->pagezero->alloc->lvl = root->page->lvl;
2247
2248         // release and unpin root pages
2249
2250         bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2251         bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
2252
2253         bt_unpinlatch (right, 1, thread_no, __LINE__);
2254         return 0;
2255 }
2256
2257 //  split already locked full node
2258 //      leave it locked.
2259 //      return pool entry for new right
2260 //      page, pinned & unlocked
2261
2262 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2263 {
2264 uint page_size = mgr->page_size;
2265 BtPageSet right[1], temp[1];
2266 uint cnt = 0, idx = 0, max;
2267 uint lvl = set->page->lvl;
2268 BtPage frame;
2269 BtKey *key;
2270 BtVal *val;
2271 uid right2;
2272 uint entry;
2273 uint prev;
2274
2275         if( !set->page->lvl )
2276                 page_size <<= mgr->leaf_xtra;
2277
2278         //  split higher half of keys to frame
2279
2280         frame = malloc (page_size);
2281         memset (frame, 0, page_size);
2282         frame->min = page_size;
2283         max = set->page->cnt;
2284         cnt = max / 2;
2285         idx = 0;
2286
2287         while( cnt++ < max ) {
2288                 if( cnt < max || set->page->lvl )
2289                   if( slotptr(set->page, cnt)->dead )
2290                         continue;
2291
2292                 val = valptr(set->page, cnt);
2293                 frame->min -= val->len + sizeof(BtVal);
2294                 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2295
2296                 key = keyptr(set->page, cnt);
2297                 frame->min -= key->len + sizeof(BtKey);
2298                 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2299
2300                 //      add librarian slot
2301
2302                 slotptr(frame, ++idx)->off = frame->min;
2303                 slotptr(frame, idx)->type = Librarian;
2304                 slotptr(frame, idx)->dead = 1;
2305
2306                 //  add actual slot
2307
2308                 slotptr(frame, ++idx)->off = frame->min;
2309                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2310
2311                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2312                         frame->act++;
2313         }
2314
2315         frame->cnt = idx;
2316         frame->lvl = lvl;
2317
2318         // link right node
2319
2320         if( set->latch->page_no > ROOT_page ) {
2321                 right2 = bt_getid (set->page->right);
2322                 bt_putid (frame->right, right2);
2323
2324                 if( linkleft )
2325                         bt_putid (frame->left, set->latch->page_no);
2326         }
2327
2328         //      get new free page and write higher keys to it.
2329
2330         if( bt_newpage(mgr, right, frame, thread_no) )
2331                 return 0;
2332
2333         //      link far right's left pointer to new page
2334
2335         if( linkleft && set->latch->page_no > ROOT_page )
2336          if( right2 ) {
2337           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2338                 temp->page = bt_mappage (mgr, temp->latch);
2339           else
2340                 return 0;
2341
2342       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2343           bt_putid (temp->page->left, right->latch->page_no);
2344           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2345           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2346          } else if( !lvl ) {    // page is rightmost leaf
2347           bt_mutexlock (mgr->lock);
2348           bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2349           bt_releasemutex(mgr->lock);
2350          }
2351
2352         // process lower keys
2353
2354         memcpy (frame, set->page, page_size);
2355         memset (set->page+1, 0, page_size - sizeof(*set->page));
2356
2357         set->page->min = page_size;
2358         set->page->garbage = 0;
2359         set->page->act = 0;
2360         max /= 2;
2361         cnt = 0;
2362         idx = 0;
2363
2364         //  assemble page of smaller keys
2365
2366         while( cnt++ < max ) {
2367                 if( slotptr(frame, cnt)->dead )
2368                         continue;
2369                 val = valptr(frame, cnt);
2370                 set->page->min -= val->len + sizeof(BtVal);
2371                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2372
2373                 key = keyptr(frame, cnt);
2374                 set->page->min -= key->len + sizeof(BtKey);
2375                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2376
2377                 //      add librarian slot
2378
2379                 slotptr(set->page, ++idx)->off = set->page->min;
2380                 slotptr(set->page, idx)->type = Librarian;
2381                 slotptr(set->page, idx)->dead = 1;
2382
2383                 //      add actual slot
2384
2385                 slotptr(set->page, ++idx)->off = set->page->min;
2386                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2387                 set->page->act++;
2388         }
2389
2390         bt_putid(set->page->right, right->latch->page_no);
2391         set->page->cnt = idx;
2392         free(frame);
2393
2394         entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2395         return entry;
2396 }
2397
2398 //      fix keys for newly split page
2399 //      call with both pages pinned & locked
2400 //  return unlocked and unpinned
2401
2402 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2403 {
2404 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2405 unsigned char value[BtId];
2406 uint lvl = set->page->lvl;
2407 BtPageSet temp[1];
2408 BtPage page;
2409 BtKey *ptr;
2410 uid right2;
2411
2412         // if current page is the root page, split it
2413
2414         if( set->latch->page_no == ROOT_page )
2415                 return bt_splitroot (mgr, set, right, thread_no);
2416
2417         ptr = keyptr(set->page, set->page->cnt);
2418         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2419
2420         page = bt_mappage (mgr, right);
2421
2422         ptr = keyptr(page, page->cnt);
2423         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2424
2425         //      splice in far right page's left page_no
2426
2427         if( right2 = bt_getid (page->right) ) {
2428           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2429                 temp->page = bt_mappage (mgr, temp->latch);
2430           else
2431                 return 0;
2432
2433       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2434           bt_putid (temp->page->left, right->page_no);
2435           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2436           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2437         } else if( !lvl ) {  // right page is far right page
2438           bt_mutexlock (mgr->lock);
2439           bt_putid (mgr->pagezero->alloc->left, right->page_no);
2440           bt_releasemutex(mgr->lock);
2441         }
2442         // insert new fences in their parent pages
2443
2444         bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2445
2446         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2447         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2448
2449         // insert new fence for reformulated left block of smaller keys
2450
2451         bt_putid (value, set->latch->page_no);
2452         ptr = (BtKey *)leftkey;
2453
2454         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2455                 return mgr->err_thread = thread_no, mgr->err;
2456
2457         // switch fence for right block of larger keys to new right page
2458
2459         bt_putid (value, right->page_no);
2460         ptr = (BtKey *)rightkey;
2461
2462         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2463                 return mgr->err_thread = thread_no, mgr->err;
2464
2465         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2466         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2467
2468         bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2469         bt_unpinlatch (right, 1, thread_no, __LINE__);
2470         return 0;
2471 }
2472
2473 //      install new key and value onto page
2474 //      page must already be checked for
2475 //      adequate space
2476
2477 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2478 {
2479 uint idx, librarian;
2480 BtSlot *node;
2481 BtKey *ptr;
2482 BtVal *val;
2483 int rate;
2484
2485         //      if previous slot is a librarian slot, use it
2486
2487         if( slot > 1 )
2488           if( slotptr(set->page, slot-1)->type == Librarian )
2489                 slot--;
2490
2491         // copy value onto page
2492
2493         set->page->min -= vallen + sizeof(BtVal);
2494         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2495         memcpy (val->value, value, vallen);
2496         val->len = vallen;
2497
2498         // copy key onto page
2499
2500         set->page->min -= keylen + sizeof(BtKey);
2501         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2502         memcpy (ptr->key, key, keylen);
2503         ptr->len = keylen;
2504         
2505         //      find first empty slot at or above our insert slot
2506
2507         for( idx = slot; idx < set->page->cnt; idx++ )
2508           if( slotptr(set->page, idx)->dead )
2509                 break;
2510
2511         // now insert key into array before slot.
2512
2513         //      if we're going all the way to the top,
2514         //      add as many librarian slots as
2515         //      makes sense.
2516
2517         if( idx == set->page->cnt ) {
2518         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2519
2520                 librarian = ++idx - slot;
2521                 avail /= sizeof(BtSlot);
2522
2523                 if( avail < 0 )
2524                         avail = 0;
2525
2526                 if( librarian > avail )
2527                         librarian = avail;
2528
2529                 if( librarian ) {
2530                         rate = (idx - slot) / librarian;
2531                         set->page->cnt += librarian;
2532                         idx += librarian;
2533                 } else
2534                         rate = 0;
2535         } else
2536                 librarian = 0, rate = 0;
2537
2538         //  transfer slots and add librarian slots
2539
2540         while( idx > slot ) {
2541                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2542
2543                 //      add librarian slot per rate
2544
2545                 if( librarian )
2546                  if( (idx - slot)/2 <= librarian * rate ) {
2547                         node = slotptr(set->page, --idx);
2548                         node->off = node[1].off;
2549                         node->type = Librarian;
2550                         node->dead = 1;
2551                         librarian--;
2552                   }
2553
2554                 --idx;
2555         }
2556
2557         set->page->act++;
2558
2559         //      fill in new slot
2560
2561         node = slotptr(set->page, slot);
2562         node->off = set->page->min;
2563         node->type = type;
2564         node->dead = 0;
2565         return 0;
2566 }
2567
2568 //  Insert new key into the btree at given level.
2569 //      either add a new key or update/add an existing one
2570
2571 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2572 {
2573 uint slot, idx, len, entry;
2574 BtPageSet set[1];
2575 BtSlot *node;
2576 BtKey *ptr;
2577 BtVal *val;
2578
2579   while( 1 ) { // find the page and slot for the current key
2580         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2581                 node = slotptr(set->page, slot);
2582                 ptr = keyptr(set->page, slot);
2583         } else
2584                 return mgr->err;
2585
2586         // if librarian slot == found slot, advance to real slot
2587
2588         if( node->type == Librarian ) {
2589                 node = slotptr(set->page, ++slot);
2590                 ptr = keyptr(set->page, slot);
2591         }
2592
2593         //  if inserting a duplicate key or unique
2594         //      key that doesn't exist on the page,
2595         //      check for adequate space on the page
2596         //      and insert the new key before slot.
2597
2598         switch( type ) {
2599         case Unique:
2600         case Duplicate:
2601           if( !keycmp (ptr, key, keylen) )
2602                 break;
2603
2604           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2605             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2606                   return mgr->err;
2607                 else
2608                   goto insxit;
2609
2610           if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2611             if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2612                   continue;
2613
2614           return mgr->err_thread = thread_no, mgr->err;
2615
2616         case Update:
2617           if( keycmp (ptr, key, keylen) )
2618                 goto insxit;
2619
2620           break;
2621         }
2622
2623         // if key already exists, update value and return
2624
2625         val = valptr(set->page, slot);
2626
2627         if( val->len >= vallen ) {
2628           if( node->dead )
2629                 set->page->act++;
2630           node->type = type;
2631           node->dead = 0;
2632
2633           set->page->garbage += val->len - vallen;
2634           val->len = vallen;
2635           memcpy (val->value, value, vallen);
2636           goto insxit;
2637         }
2638
2639         //  new update value doesn't fit in existing value area
2640         //      make sure page has room
2641
2642         if( !node->dead )
2643           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2644         else
2645           set->page->act++;
2646
2647         node->type = type;
2648         node->dead = 0;
2649
2650         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2651           break;
2652
2653         if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2654           if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2655                 continue;
2656
2657         return mgr->err_thread = thread_no, mgr->err;
2658   }
2659
2660   //  copy key and value onto page and update slot
2661
2662   set->page->min -= vallen + sizeof(BtVal);
2663   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2664   memcpy (val->value, value, vallen);
2665   val->len = vallen;
2666
2667   set->page->min -= keylen + sizeof(BtKey);
2668   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2669   memcpy (ptr->key, key, keylen);
2670   ptr->len = keylen;
2671         
2672   slotptr(set->page,slot)->off = set->page->min;
2673
2674 insxit:
2675   bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2676   bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2677   return 0;
2678 }
2679
2680 //      determine actual page where key is located
2681 //  return slot number
2682
2683 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2684 {
2685 BtKey *key = keyptr(source,src), *ptr;
2686 uint slot = locks[src].slot;
2687 uint entry;
2688
2689         if( locks[src].reuse )
2690           entry = locks[src-1].entry;
2691         else
2692           entry = locks[src].entry;
2693
2694         if( slot ) {
2695                 set->latch = mgr->leafsets + entry;
2696                 set->page = bt_mappage (mgr, set->latch);
2697                 return slot;
2698         }
2699
2700         //      find where our key is located 
2701         //      on current page or pages split on
2702         //      same page txn operations.
2703
2704         do {
2705                 set->latch = mgr->leafsets + entry;
2706                 set->page = bt_mappage (mgr, set->latch);
2707
2708                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2709                   if( slotptr(set->page, slot)->type == Librarian )
2710                         slot++;
2711                   if( locks[src].reuse )
2712                         locks[src].entry = entry;
2713                   return slot;
2714                 }
2715         } while( entry = set->latch->split );
2716
2717         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2718         return 0;
2719 }
2720
2721 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2722 {
2723 BtKey *key = keyptr(source, src);
2724 BtVal *val = valptr(source, src);
2725 BtLatchSet *latch;
2726 BtPageSet set[1];
2727 uint entry, slot;
2728
2729   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2730         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2731           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2732                 return mgr->err_thread = thread_no, mgr->err;
2733
2734           set->page->lsn = lsn;
2735           return 0;
2736         }
2737
2738         //  split page
2739
2740         if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2741           latch = mgr->leafsets + entry;
2742         else
2743           return mgr->err;
2744
2745         //      splice right page into split chain
2746         //      and WriteLock it
2747
2748         bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2749         latch->split = set->latch->split;
2750         set->latch->split = entry;
2751
2752         // clear slot number for atomic page
2753
2754         locks[src].slot = 0;
2755   }
2756
2757   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2758 }
2759
2760 //      perform delete from smaller btree
2761 //  insert a delete slot if not found there
2762
2763 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2764 {
2765 BtKey *key = keyptr(source, src);
2766 BtPageSet set[1];
2767 uint idx, slot;
2768 BtSlot *node;
2769 BtKey *ptr;
2770 BtVal *val;
2771
2772         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2773           node = slotptr(set->page, slot);
2774           ptr = keyptr(set->page, slot);
2775           val = valptr(set->page, slot);
2776         } else
2777           return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2778
2779         //  if slot is not found, insert a delete slot
2780
2781         if( keycmp (ptr, key->key, key->len) )
2782           if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2783                 return mgr->err;
2784
2785         //      if node is already dead,
2786         //      ignore the request.
2787
2788         if( node->dead )
2789                 return 0;
2790
2791         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2792         set->page->lsn = lsn;
2793         set->page->act--;
2794
2795         node->dead = 0;
2796         __sync_fetch_and_add(&mgr->found, 1);
2797         return 0;
2798 }
2799
2800 //      release master's splits from right to left
2801
2802 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2803 {
2804 BtLatchSet *latch = mgr->leafsets + entry;
2805
2806         if( latch->split )
2807                 bt_atomicrelease (mgr, latch->split, thread_no);
2808
2809         latch->split = 0;
2810         bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2811         bt_unpinlatch(latch, 1, thread_no, __LINE__);
2812 }
2813
2814 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2815 {
2816 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2817 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2818
2819         return keycmp (key1, key2->key, key2->len);
2820 }
2821 //      atomic modification of a batch of keys.
2822
2823 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2824 {
2825 uint src, idx, slot, samepage, entry, que = 0;
2826 BtKey *key, *ptr, *key2;
2827 int result = 0;
2828 BtSlot temp[1];
2829 logseqno lsn;
2830 int type;
2831
2832   // stable sort the list of keys into order to
2833   //    prevent deadlocks between threads.
2834 /*
2835   for( src = 1; src++ < source->cnt; ) {
2836         *temp = *slotptr(source,src);
2837         key = keyptr (source,src);
2838
2839         for( idx = src; --idx; ) {
2840           key2 = keyptr (source,idx);
2841           if( keycmp (key, key2->key, key2->len) < 0 ) {
2842                 *slotptr(source,idx+1) = *slotptr(source,idx);
2843                 *slotptr(source,idx) = *temp;
2844           } else
2845                 break;
2846         }
2847   }
2848 */
2849         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2850   //  add entries to redo log
2851
2852   if( bt->mgr->pagezero->redopages )
2853         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2854   else
2855         lsn = 0;
2856
2857   //  perform the individual actions in the transaction
2858
2859   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2860         return bt->mgr->err;
2861
2862   // if number of active pages
2863   // is greater than the buffer pool
2864   // promote page into larger btree
2865
2866   if( bt->main )
2867    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2868         if( bt_promote (bt) )
2869           return bt->mgr->err;
2870
2871   // return success
2872
2873   return 0;
2874 }
2875
2876 //      execute the source list of inserts/deletes
2877
2878 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2879 {
2880 uint slot, src, idx, samepage, entry;
2881 BtPageSet set[1], prev[1];
2882 unsigned char value[BtId];
2883 BtLatchSet *latch;
2884 uid right_page_no;
2885 AtomicTxn *locks;
2886 BtKey *key, *ptr;
2887 BtPage page;
2888 BtVal *val;
2889
2890   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2891
2892   // Load the leaf page for each key
2893   // group same page references with reuse bit
2894
2895   for( src = 0; src++ < source->cnt; ) {
2896         key = keyptr(source, src);
2897
2898         // first determine if this modification falls
2899         // on the same page as the previous modification
2900         //      note that the far right leaf page is a special case
2901
2902         if( samepage = src > 1 )
2903           samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
2904
2905         if( !samepage )
2906           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2907                 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
2908           else
2909                 return mgr->err;
2910         else
2911           slot = 0;
2912
2913         if( slot )
2914          if( slotptr(set->page, slot)->type == Librarian )
2915           slot++;
2916
2917         entry = set->latch - mgr->leafsets;
2918         locks[src].reuse = samepage;
2919         locks[src].entry = entry;
2920         locks[src].slot = slot;
2921
2922         //      capture current lsn for master page
2923
2924         locks[src].reqlsn = set->page->lsn;
2925   }
2926
2927   // insert or delete each key
2928   // process any splits or merges
2929   // run through txn list backwards
2930
2931   samepage = source->cnt + 1;
2932
2933   for( src = source->cnt; src; src-- ) {
2934         if( locks[src].reuse )
2935           continue;
2936
2937         //  perform the txn operations
2938         //      from smaller to larger on
2939         //  the same page
2940
2941         for( idx = src; idx < samepage; idx++ )
2942          switch( slotptr(source,idx)->type ) {
2943          case Delete:
2944           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2945                 return mgr->err;
2946           break;
2947
2948         case Duplicate:
2949         case Unique:
2950           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2951                 return mgr->err;
2952           break;
2953
2954         default:
2955           bt_atomicpage (mgr, source, locks, idx, set);
2956           break;
2957         }
2958
2959         //      after the same page operations have finished,
2960         //  process master page for splits or deletion.
2961
2962         latch = prev->latch = mgr->leafsets + locks[src].entry;
2963         prev->page = bt_mappage (mgr, prev->latch);
2964         samepage = src;
2965
2966         //  pick-up all splits from master page
2967         //      each one is already pinned & WriteLocked.
2968
2969         while( entry = prev->latch->split ) {
2970           set->latch = mgr->leafsets + entry;
2971           set->page = bt_mappage (mgr, set->latch);
2972
2973           // delete empty master page by undoing its split
2974           //  (this is potentially another empty page)
2975           //  note that there are no pointers to it yet
2976
2977           if( !prev->page->act ) {
2978                 memcpy (set->page->left, prev->page->left, BtId);
2979                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
2980                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
2981                 prev->latch->split = set->latch->split;
2982                 bt_freepage (mgr, set, thread_no);
2983                 continue;
2984           }
2985
2986           // remove empty split page from the split chain
2987           // and return it to the free list. No other
2988           // thread has its page number yet.
2989
2990           if( !set->page->act ) {
2991                 memcpy (prev->page->right, set->page->right, BtId);
2992                 prev->latch->split = set->latch->split;
2993
2994                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
2995                 bt_freepage (mgr, set, thread_no);
2996                 continue;
2997           }
2998
2999           //  update prev's fence key
3000
3001           ptr = keyptr(prev->page,prev->page->cnt);
3002           bt_putid (value, prev->latch->page_no);
3003
3004           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3005                 return mgr->err;
3006
3007           // splice in the left link into the split page
3008
3009           bt_putid (set->page->left, prev->latch->page_no);
3010           *prev = *set;
3011         }
3012
3013         //  update left pointer in next right page from last split page
3014         //      (if all splits were reversed or none occurred, latch->split == 0)
3015
3016         if( latch->split ) {
3017           //  fix left pointer in master's original (now split)
3018           //  far right sibling or set rightmost page in page zero
3019
3020           if( right_page_no = bt_getid (prev->page->right) ) {
3021                 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3022                   set->page = bt_mappage (mgr, set->latch);
3023                 else
3024                   return mgr->err;
3025
3026             bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3027             bt_putid (set->page->left, prev->latch->page_no);
3028             bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3029                 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
3030           } else {      // prev is rightmost page
3031             bt_mutexlock (mgr->lock);
3032                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3033             bt_releasemutex(mgr->lock);
3034           }
3035
3036           //  switch the original fence key from the
3037           //  master page to the last split page.
3038
3039           ptr = keyptr(prev->page,prev->page->cnt);
3040           bt_putid (value, prev->latch->page_no);
3041
3042           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3043                 return mgr->err;
3044
3045           //  unlock and unpin the split pages
3046
3047           bt_atomicrelease (mgr, latch->split, thread_no);
3048
3049           //  unlock and unpin the master page
3050
3051           latch->split = 0;
3052           bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3053           bt_unpinlatch(latch, 1, thread_no, __LINE__);
3054           continue;
3055         }
3056
3057         //      since there are no splits remaining, we're
3058         //  finished if master page occupied
3059
3060         if( prev->page->act ) {
3061           bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3062           bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
3063           continue;
3064         }
3065
3066         // any and all splits were reversed, and the
3067         // master page located in prev is empty, delete it
3068
3069         if( bt_deletepage (mgr, prev, thread_no, 0) )
3070                 return mgr->err;
3071   }
3072
3073   free (locks);
3074   return 0;
3075 }
3076
3077 //  pick & promote a page into the larger btree
3078
3079 BTERR bt_promote (BtDb *bt)
3080 {
3081 uint entry, slot, idx;
3082 BtPageSet set[1];
3083 BtSlot *node;
3084 BtKey *ptr;
3085 BtVal *val;
3086
3087   while( 1 ) {
3088 #ifdef unix
3089         entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3090 #else
3091         entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3092 #endif
3093         entry %= bt->mgr->leaftotal;
3094
3095         if( !entry )
3096                 continue;
3097
3098         set->latch = bt->mgr->leafsets + entry;
3099
3100         //  skip this entry if it has never been used
3101
3102         if( !set->latch->page_no )
3103           continue;
3104
3105         if( !bt_mutextry(set->latch->modify) )
3106                 continue;
3107
3108         //  skip this entry if it is pinned
3109
3110         if( set->latch->pin & ~CLOCK_bit ) {
3111           bt_releasemutex(set->latch->modify);
3112           continue;
3113         }
3114
3115         set->page = bt_mappage (bt->mgr, set->latch);
3116
3117         // entry has no right sibling
3118
3119         if( !bt_getid (set->page->right) ) {
3120           bt_releasemutex(set->latch->modify);
3121           continue;
3122         }
3123
3124         // entry interiour node or being killed or constructed
3125
3126         if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3127           bt_releasemutex(set->latch->modify);
3128           continue;
3129         }
3130
3131         //  pin the page for our access
3132         //      and leave it locked for the
3133         //      duration of the promotion.
3134
3135         set->latch->pin++;
3136         bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3137         bt_releasemutex(set->latch->modify);
3138
3139         // transfer slots in our selected page to the main btree
3140 if( !(entry % 100) )
3141 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3142
3143         if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3144                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3145                 return bt->main->err;
3146         }
3147
3148         //  now delete the page
3149
3150         if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3151                 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3152
3153         return bt->mgr->err;
3154   }
3155 }
3156
3157 //      find unique key == given key, or first duplicate key in
3158 //      leaf level and return number of value bytes
3159 //      or (-1) if not found.
3160
3161 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3162 {
3163 int ret = -1, type;
3164 BtPageSet set[1];
3165 BtSlot *node;
3166 BtKey *ptr;
3167 BtVal *val;
3168 uint slot;
3169
3170  for( type = 0; type < 2; type++ )
3171   if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3172         node = slotptr(set->page, slot);
3173
3174         //      skip librarian slot place holder
3175
3176         if( node->type == Librarian )
3177           node = slotptr(set->page, ++slot);
3178
3179         ptr = keyptr(set->page, slot);
3180
3181         //      not there if we reach the stopper key
3182         //  or the key doesn't match what's on the page.
3183
3184         if( slot == set->page->cnt )
3185           if( !bt_getid (set->page->right) ) {
3186                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3187                 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3188                 continue;
3189         }
3190
3191         if( keycmp (ptr, key, keylen) ) {
3192                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3193                 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3194                 continue;
3195         }
3196
3197         // key matches, return >= 0 value bytes copied
3198         //      or -1 if not there.
3199
3200         if( node->type == Delete || node->dead ) {
3201                 ret = -1;
3202                 goto findxit;
3203         }
3204
3205         val = valptr (set->page,slot);
3206
3207         if( valmax > val->len )
3208                 valmax = val->len;
3209
3210         memcpy (value, val->value, valmax);
3211         ret = valmax;
3212         goto findxit;
3213   }
3214
3215   ret = -1;
3216
3217 findxit:
3218   if( type < 2 ) {
3219         bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3220         bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3221   }
3222   return ret;
3223 }
3224
3225 //      set cursor to highest slot on right-most page
3226
3227 BTERR bt_lastkey (BtDb *bt)
3228 {
3229 uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3230 uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
3231
3232         if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3233                 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3234         else
3235                 return bt->mgr->err;
3236
3237     bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3238         bt->cacheslot = bt->cacheset->page->cnt;
3239
3240         if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3241                 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3242         else
3243                 return bt->main->err;
3244
3245     bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3246         bt->mainslot = bt->mainset->page->cnt;
3247         bt->phase = 2;
3248         return 0;
3249 }
3250
3251 //      return previous slot on cursor page
3252
3253 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3254 {
3255 uid next, us = set->latch->page_no;
3256
3257   while( 1 ) {
3258         while( --slot )
3259           if( slotptr(set->page, slot)->dead )
3260                 continue;
3261           else
3262                 return slot;
3263
3264         next = bt_getid(set->page->left);
3265
3266         if( !next )
3267                 return 0;
3268
3269         do {
3270           bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3271           bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3272
3273           if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3274             set->page = bt_mappage (mgr, set->latch);
3275           else
3276             return 0;
3277
3278       bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3279           next = bt_getid (set->page->right);
3280
3281         } while( next != us );
3282
3283     slot = set->page->cnt + 1;
3284   }
3285 }
3286
3287 //  advance to previous key
3288
3289 BTERR bt_prevkey (BtDb *bt)
3290 {
3291 int cmp;
3292
3293   // first advance last key(s) one previous slot
3294
3295   while( 1 ) {
3296         switch( bt->phase ) {
3297         case 0:
3298           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3299           break;
3300         case 1:
3301           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3302           break;
3303         case 2:
3304           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3305           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3306         break;
3307         }
3308
3309   // return next key
3310
3311         if( bt->cacheslot ) {
3312           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3313           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3314           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3315         }
3316
3317         if( bt->mainslot ) {
3318           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3319           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3320           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3321         }
3322
3323         if( bt->mainslot && bt->cacheslot )
3324           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3325         else if( bt->cacheslot )
3326           cmp = 1;
3327         else if( bt->mainslot )
3328           cmp = -1;
3329         else
3330           return 0;
3331
3332         //  cache key is larger
3333
3334         if( cmp > 0 ) {
3335           bt->phase = 0;
3336           if( bt->cachenode->type == Delete )
3337                 continue;
3338           return bt->cacheslot;
3339         }
3340
3341         //  main key is larger
3342
3343         if( cmp < 0 ) {
3344           bt->phase = 1;
3345           return bt->mainslot;
3346         }
3347
3348         //      keys are equal
3349
3350         bt->phase = 2;
3351
3352         if( bt->cachenode->type == Delete )
3353                 continue;
3354
3355         return bt->cacheslot;
3356   }
3357 }
3358
3359 //      advance to next slot in cache or main btree
3360 //      return 0 for EOF/error
3361
3362 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3363 {
3364 BtPage page;
3365 uid page_no;
3366
3367   while( 1 ) {
3368         while( slot++ < set->page->cnt )
3369           if( slotptr(set->page, slot)->dead )
3370                 continue;
3371           else if( slot < set->page->cnt || bt_getid (set->page->right) )
3372                 return slot;
3373           else
3374                 return 0;
3375
3376         bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3377         bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3378
3379         if( page_no = bt_getid(set->page->right) )
3380           if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3381                 set->page = bt_mappage (mgr, set->latch);
3382           else
3383                 return 0;
3384         else
3385           return 0; // EOF
3386
3387         // obtain access lock using lock chaining with Access mode
3388
3389         bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3390         bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3391         bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3392         slot = 0;
3393   }
3394 }
3395
3396 //  advance to next key
3397
3398 BTERR bt_nextkey (BtDb *bt)
3399 {
3400 int cmp;
3401
3402   // first advance last key(s) one next slot
3403
3404   while( 1 ) {
3405         switch( bt->phase ) {
3406         case 0:
3407           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3408           break;
3409         case 1:
3410           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3411           break;
3412         case 2:
3413           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3414           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3415         break;
3416         }
3417
3418   // return next key
3419
3420         if( bt->cacheslot ) {
3421           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3422           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3423           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3424         }
3425
3426         if( bt->mainslot ) {
3427           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3428           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3429           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3430         }
3431
3432         if( bt->mainslot && bt->cacheslot )
3433           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3434         else if( bt->mainslot )
3435           cmp = 1;
3436         else if( bt->cacheslot )
3437           cmp = -1;
3438         else
3439           return 0;
3440
3441         //  main key is larger
3442
3443         if( cmp < 0 ) {
3444           bt->phase = 0;
3445           if( bt->cachenode->type == Delete )
3446                 continue;
3447           return bt->cacheslot;
3448         }
3449
3450         //  cache key is larger
3451
3452         if( cmp > 0 ) {
3453           bt->phase = 1;
3454           return bt->mainslot;
3455         }
3456
3457         //      keys are equal
3458
3459         bt->phase = 2;
3460
3461         if( bt->cachenode->type == Delete )
3462                 continue;
3463
3464         return bt->cacheslot;
3465   }
3466 }
3467
3468 //  start sweep of keys
3469
3470 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3471 {
3472 BtPageSet set[1];
3473 uint slot;
3474
3475         // cache btree page
3476
3477         if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3478                 bt->cacheslot = slot - 1;
3479         else
3480           return bt->mgr->err;
3481
3482         // main btree page
3483
3484         if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3485                 bt->mainslot = slot - 1;
3486         else
3487           return bt->mgr->err;
3488
3489         bt->phase = 2;
3490         return 0;
3491 }
3492
3493 #ifdef STANDALONE
3494
3495 #ifndef unix
3496 double getCpuTime(int type)
3497 {
3498 FILETIME crtime[1];
3499 FILETIME xittime[1];
3500 FILETIME systime[1];
3501 FILETIME usrtime[1];
3502 SYSTEMTIME timeconv[1];
3503 double ans = 0;
3504
3505         memset (timeconv, 0, sizeof(SYSTEMTIME));
3506
3507         switch( type ) {
3508         case 0:
3509                 GetSystemTimeAsFileTime (xittime);
3510                 FileTimeToSystemTime (xittime, timeconv);
3511                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3512                 break;
3513         case 1:
3514                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3515                 FileTimeToSystemTime (usrtime, timeconv);
3516                 break;
3517         case 2:
3518                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3519                 FileTimeToSystemTime (systime, timeconv);
3520                 break;
3521         }
3522
3523         ans += (double)timeconv->wHour * 3600;
3524         ans += (double)timeconv->wMinute * 60;
3525         ans += (double)timeconv->wSecond;
3526         ans += (double)timeconv->wMilliseconds / 1000;
3527         return ans;
3528 }
3529 #else
3530 #include <time.h>
3531 #include <sys/resource.h>
3532
3533 double getCpuTime(int type)
3534 {
3535 struct rusage used[1];
3536 struct timeval tv[1];
3537
3538         switch( type ) {
3539         case 0:
3540                 gettimeofday(tv, NULL);
3541                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3542
3543         case 1:
3544                 getrusage(RUSAGE_SELF, used);
3545                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3546
3547         case 2:
3548                 getrusage(RUSAGE_SELF, used);
3549                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3550         }
3551
3552         return 0;
3553 }
3554 #endif
3555
3556 void bt_poolaudit (BtMgr *mgr, char *type)
3557 {
3558 BtLatchSet *latch, test[1];
3559 uint entry;
3560
3561         memset (test, 0, sizeof(test));
3562
3563         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3564                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3565
3566         if( memcmp (test, mgr->leafsets, sizeof(test)) )
3567                 fprintf(stderr, "%s leafset zero overwritten\n", type);
3568
3569         for( entry = 0; ++entry < mgr->latchtotal; ) {
3570                 latch = mgr->latchsets + entry;
3571
3572                 if( latch->leaf )
3573                         fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
3574
3575                 if( *latch->modify->value )
3576                         fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3577
3578                 if( latch->pin & ~CLOCK_bit )
3579                         fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3580         }
3581
3582         for( entry = 0; ++entry < mgr->leaftotal; ) {
3583                 latch = mgr->leafsets + entry;
3584
3585                 if( !latch->leaf )
3586                         fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
3587
3588                 if( *latch->modify->value )
3589                         fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3590
3591                 if( latch->pin & ~CLOCK_bit )
3592                         fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3593         }
3594 }
3595
3596 typedef struct {
3597         char idx;
3598         char *type;
3599         char *infile;
3600         BtMgr *main;
3601         BtMgr *mgr;
3602         int num;
3603 } ThreadArg;
3604
3605 //  standalone program to index file of keys
3606 //  then list them onto std-out
3607
3608 #ifdef unix
3609 void *index_file (void *arg)
3610 #else
3611 uint __stdcall index_file (void *arg)
3612 #endif
3613 {
3614 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3615 uid next, page_no = LEAF_page;  // start on first page of leaves
3616 int ch, len = 0, slot, type = 0;
3617 unsigned char key[BT_maxkey];
3618 unsigned char buff[65536];
3619 uint nxt = sizeof(buff);
3620 ThreadArg *args = arg;
3621 BtPageSet set[1];
3622 BtPage page;
3623 int vallen;
3624 BtKey *ptr;
3625 BtVal *val;
3626 BtDb *bt;
3627 FILE *in;
3628
3629         bt = bt_open (args->mgr, args->main);
3630         page = (BtPage)buff;
3631
3632         if( args->idx < strlen (args->type) )
3633                 ch = args->type[args->idx];
3634         else
3635                 ch = args->type[strlen(args->type) - 1];
3636
3637         switch(ch | 0x20)
3638         {
3639         case 'd':
3640                 type = Delete;
3641
3642         case 'p':
3643                 if( !type )
3644                         type = Unique;
3645
3646                 if( args->num )
3647                  if( type == Delete )
3648                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3649                  else
3650                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3651                 else
3652                  if( type == Delete )
3653                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3654                  else
3655                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3656
3657                 if( in = fopen (args->infile, "rb") )
3658                   while( ch = getc(in), ch != EOF )
3659                         if( ch == '\n' )
3660                         {
3661                           line++;
3662
3663                           if( !args->num ) {
3664                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3665                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3666                             len = 0;
3667                                 continue;
3668                           }
3669
3670                           nxt -= len - 10;
3671                           memcpy (buff + nxt, key + 10, len - 10);
3672                           nxt -= 1;
3673                           buff[nxt] = len - 10;
3674                           nxt -= 10;
3675                           memcpy (buff + nxt, key, 10);
3676                           nxt -= 1;
3677                           buff[nxt] = 10;
3678                           slotptr(page,++cnt)->off  = nxt;
3679                           slotptr(page,cnt)->type = type;
3680                           len = 0;
3681
3682                           if( cnt < args->num )
3683                                 continue;
3684
3685                           page->cnt = cnt;
3686                           page->act = cnt;
3687                           page->min = nxt;
3688
3689                           if( bt_atomictxn (bt, page) )
3690                                 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3691                           nxt = sizeof(buff);
3692                           cnt = 0;
3693
3694                         }
3695                         else if( len < BT_maxkey )
3696                                 key[len++] = ch;
3697                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3698                 break;
3699
3700         case 'w':
3701                 fprintf(stderr, "started indexing for %s\n", args->infile);
3702                 if( in = fopen (args->infile, "r") )
3703                   while( ch = getc(in), ch != EOF )
3704                         if( ch == '\n' )
3705                         {
3706                           line++;
3707
3708                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3709                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3710                           len = 0;
3711                         }
3712                         else if( len < BT_maxkey )
3713                                 key[len++] = ch;
3714                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3715                 break;
3716
3717         case 'f':
3718                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3719                 if( in = fopen (args->infile, "rb") )
3720                   while( ch = getc(in), ch != EOF )
3721                         if( ch == '\n' )
3722                         {
3723                           line++;
3724                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3725                                 found++;
3726                           else if( bt->mgr->err )
3727                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3728                           len = 0;
3729                         }
3730                         else if( len < BT_maxkey )
3731                                 key[len++] = ch;
3732                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3733                 break;
3734
3735         case 's':
3736                 fprintf(stderr, "started forward scan\n");
3737                 if( bt_startkey (bt, NULL, 0) )
3738                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3739
3740                 while( bt_nextkey (bt) ) {
3741                   if( bt->phase == 1 ) {
3742                         len = bt->mainkey->len;
3743
3744                         if( bt->mainnode->type == Duplicate )
3745                                 len -= BtId;
3746
3747                         fwrite (bt->mainkey->key, len, 1, stdout);
3748                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3749                   } else {
3750                         len = bt->cachekey->len;
3751
3752                         if( bt->cachenode->type == Duplicate )
3753                                 len -= BtId;
3754
3755                         fwrite (bt->cachekey->key, len, 1, stdout);
3756                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3757                   }
3758
3759                   fputc ('\n', stdout);
3760                   cnt++;
3761             }
3762
3763                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3764                 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3765
3766                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3767                 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3768
3769                 fprintf(stderr, " Total keys read %d\n", cnt);
3770                 break;
3771
3772         case 'r':
3773                 fprintf(stderr, "started reverse scan\n");
3774                 if( bt_lastkey (bt) )
3775                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3776
3777                 while( bt_prevkey (bt) ) {
3778                   if( bt->phase == 1 ) {
3779                         len = bt->mainkey->len;
3780
3781                         if( bt->mainnode->type == Duplicate )
3782                                 len -= BtId;
3783
3784                         fwrite (bt->mainkey->key, len, 1, stdout);
3785                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3786                   } else {
3787                         len = bt->cachekey->len;
3788
3789                         if( bt->cachenode->type == Duplicate )
3790                                 len -= BtId;
3791
3792                         fwrite (bt->cachekey->key, len, 1, stdout);
3793                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3794                   }
3795
3796                   fputc ('\n', stdout);
3797                   cnt++;
3798             }
3799
3800                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3801                 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3802
3803                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3804                 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3805
3806                 fprintf(stderr, " Total keys read %d\n", cnt);
3807                 break;
3808
3809         case 'c':
3810                 fprintf(stderr, "started counting LSM cache btree\n");
3811                 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3812                 page_no = LEAF_page;
3813
3814 #ifdef unix
3815                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3816 #endif
3817                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3818                   pread(bt->mgr->idx, page, sizeof(*page), page_no << bt->mgr->page_bits);
3819                   if( !page->lvl && !page->free )
3820                     cnt += page->act;
3821                   if( next )
3822                         page_no = next;
3823                   else
3824                         page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
3825                   next = 0;
3826                 }
3827                 
3828                 cachecnt = --cnt;       // remove stopper key
3829                 cnt = 0;
3830
3831                 fprintf(stderr, "started counting LSM main btree\n");
3832                 next = bt->main->redopage + bt->main->pagezero->redopages;
3833                 page_no = LEAF_page;
3834
3835 #ifdef unix
3836                 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3837 #endif
3838                 while( page_no < bt_getid(bt->main->pagezero->alloc->right) ) {
3839                   pread(bt->main->idx, page, sizeof(*page), page_no << bt->main->page_bits);
3840                   if( !page->lvl )
3841                     cnt += page->act;
3842                   if( next )
3843                         page_no = next;
3844                   else
3845                         page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
3846                   next = 0;
3847                 }
3848                 
3849                 cnt--;  // remove stopper key
3850
3851                 fprintf(stderr, " cache keys counted %d\n", cachecnt);
3852                 fprintf(stderr, " main keys counted %d\n", cnt);
3853                 fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
3854                 break;
3855         }
3856
3857         bt_close (bt);
3858 #ifdef unix
3859         return NULL;
3860 #else
3861         return 0;
3862 #endif
3863 }
3864
3865 typedef struct timeval timer;
3866
3867 int main (int argc, char **argv)
3868 {
3869 int idx, cnt, len, slot, err;
3870 double start, stop;
3871 #ifdef unix
3872 pthread_t *threads;
3873 #else
3874 HANDLE *threads;
3875 #endif
3876 ThreadArg *args;
3877 uint mainleafpool = 0;
3878 uint mainleafxtra = 0;
3879 uint redopages = 0;
3880 uint poolsize = 0;
3881 uint leafpool = 0;
3882 uint leafxtra = 0;
3883 uint mainpool = 0;
3884 uint mainbits = 0;
3885 int bits = 16;
3886 float elapsed;
3887 int num = 0;
3888 char key[1];
3889 BtMgr *main;
3890 BtMgr *mgr;
3891 BtKey *ptr;
3892
3893         if( argc < 3 ) {
3894                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3895                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3896                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3897                 fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3898                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3899                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3900                 fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
3901                 fprintf (stderr, "  leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3902                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3903                 fprintf (stderr, "  redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3904                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3905                 fprintf (stderr, "  mainpool is the number of main pages in the main buffer pool\n");
3906                 fprintf (stderr, "  mainleaf is the number of main pages in the main leaf pool\n");
3907                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3908                 exit(0);
3909         }
3910
3911         start = getCpuTime(0);
3912
3913         if( argc > 4 )
3914                 bits = atoi(argv[4]);
3915
3916         if( argc > 5 )
3917                 leafxtra = atoi(argv[5]);
3918
3919         if( argc > 6 )
3920                 poolsize = atoi(argv[6]);
3921
3922         if( !poolsize )
3923                 fprintf (stderr, "no page pool\n"), exit(1);
3924
3925         if( argc > 7 )
3926                 leafpool = atoi(argv[7]);
3927
3928         if( argc > 8 )
3929                 num = atoi(argv[8]);
3930
3931         if( argc > 9 )
3932                 redopages = atoi(argv[9]);
3933
3934         if( redopages > 65535 )
3935                 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3936
3937         if( argc > 10 )
3938                 mainbits = atoi(argv[10]);
3939
3940         if( argc > 11 )
3941                 mainleafxtra = atoi(argv[11]);
3942
3943         if( argc > 12 )
3944                 mainpool = atoi(argv[12]);
3945
3946         if( argc > 13 )
3947                 mainleafpool = atoi(argv[13]);
3948
3949         cnt = argc - 14;
3950 #ifdef unix
3951         threads = malloc (cnt * sizeof(pthread_t));
3952 #else
3953         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3954 #endif
3955         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3956
3957         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3958
3959         if( !mgr ) {
3960                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3961                 exit (1);
3962         } else
3963                 mgr->type = 0;
3964
3965         if( mainbits ) {
3966                 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3967
3968                 if( !main ) {
3969                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3970                         exit (1);
3971                 } else
3972                         main->type = 1;
3973         } else
3974                 main = NULL;
3975
3976         //      fire off threads
3977
3978         if( cnt )
3979           for( idx = 0; idx < cnt; idx++ ) {
3980                 args[idx].infile = argv[idx + 14];
3981                 args[idx].type = argv[3];
3982                 args[idx].main = main;
3983                 args[idx].mgr = mgr;
3984                 args[idx].num = num;
3985                 args[idx].idx = idx;
3986 #ifdef unix
3987                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3988                         fprintf(stderr, "Error creating thread %d\n", err);
3989 #else
3990                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3991 #endif
3992           }
3993         else {
3994                 args[0].infile = argv[idx + 10];
3995                 args[0].type = argv[3];
3996                 args[0].main = main;
3997                 args[0].mgr = mgr;
3998                 args[0].num = num;
3999                 args[0].idx = 0;
4000                 index_file (args);
4001         }
4002
4003         //      wait for termination
4004
4005 #ifdef unix
4006         for( idx = 0; idx < cnt; idx++ )
4007                 pthread_join (threads[idx], NULL);
4008 #else
4009         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4010
4011         for( idx = 0; idx < cnt; idx++ )
4012                 CloseHandle(threads[idx]);
4013 #endif
4014         bt_poolaudit(mgr, "cache");
4015
4016         if( main )
4017                 bt_poolaudit(main, "main");
4018
4019         fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4020         if( main )
4021                 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4022
4023         bt_mgrclose (mgr);
4024
4025         if( main )
4026                 bt_mgrclose (main);
4027
4028         elapsed = getCpuTime(0) - start;
4029         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4030         elapsed = getCpuTime(1);
4031         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4032         elapsed = getCpuTime(2);
4033         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4034 }
4035
4036 BtKey *bt_key (BtPage page, uint slot)
4037 {
4038 return keyptr(page,slot);
4039 }
4040
4041 BtSlot *bt_slot (BtPage page, uint slot)
4042 {
4043 return slotptr(page,slot);
4044 }
4045 #endif  //STANDALONE