]> pd.if.org Git - btree/blob - threadskv10g.c
Fix small bug in main when there is less t han one input file
[btree] / threadskv10g.c
1 // btree version threadskv10g futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      LSM B-trees for write optimization
11 //      larger sized leaf pages than non-leaf
12 //      and LSM B-tree find & count operations
13
14 // 28 OCT 2014
15
16 // author: karl malbrain, malbrain@cal.berkeley.edu
17
18 /*
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
21
22 The orginal author is Karl Malbrain.
23
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
30 */
31
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
34
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
37
38 #ifdef linux
39 #define _GNU_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
42 #define SYS_futex 202
43 #endif
44
45 #ifdef unix
46 #include <unistd.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <fcntl.h>
50 #include <sys/time.h>
51 #include <sys/mman.h>
52 #include <errno.h>
53 #include <pthread.h>
54 #include <limits.h>
55 #else
56 #define WIN32_LEAN_AND_MEAN
57 #include <windows.h>
58 #include <stdio.h>
59 #include <stdlib.h>
60 #include <time.h>
61 #include <fcntl.h>
62 #include <process.h>
63 #include <intrin.h>
64 #endif
65
66 #include <memory.h>
67 #include <string.h>
68 #include <stddef.h>
69
70 typedef unsigned long long      uid;
71 typedef unsigned long long      logseqno;
72
73 #ifndef unix
74 typedef unsigned long long      off64_t;
75 typedef unsigned short          ushort;
76 typedef unsigned int            uint;
77 #endif
78
79 #define BT_ro 0x6f72    // ro
80 #define BT_rw 0x7772    // rw
81
82 #define BT_maxbits              26                                      // maximum page size in bits
83 #define BT_minbits              9                                       // minimum page size in bits
84 #define BT_minpage              (1 << BT_minbits)       // minimum page size
85 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
86
87 //  BTree page number constants
88 #define ALLOC_page              0       // allocation page
89 #define ROOT_page               1       // root of the btree
90 #define LEAF_page               2       // first page of leaves
91
92 //      Number of levels to create in a new BTree
93
94 #define MIN_lvl                 2
95
96 /*
97 There are six lock types for each node in four independent sets: 
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
104 */
105
106 typedef enum{
107         BtLockAccess = 1,
108         BtLockDelete = 2,
109         BtLockRead   = 4,
110         BtLockWrite  = 8,
111         BtLockParent = 16,
112         BtLockLink   = 32
113 } BtLock;
114
115 typedef struct {
116   union {
117         struct {
118           volatile unsigned char xcl[1];
119           volatile unsigned char filler;
120           volatile ushort waiters[1];
121         } bits[1];
122         uint value[1];
123   };
124 } MutexLatch;
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   MutexLatch xcl[1];
130   MutexLatch wrt[1];
131   ushort readers;
132   ushort dup;           // re-entrant locks
133   ushort tid;           // owner thread-no
134   ushort line;          // owner line #
135 } RWLock;
136
137 //  hash table entries
138
139 typedef struct {
140         MutexLatch latch[1];
141         uint entry;             // Latch table entry at head of chain
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock link[1];                 // left link update in progress
152         MutexLatch modify[1];   // modify entry lite latch
153         uint split;                             // right split page atomic insert
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of accessing threads
157         unsigned char dirty;    // page in cache is dirty
158         unsigned char leaf;             // page in cache is a leaf
159 } BtLatchSet;
160
161 //      Define the length of the page record numbers
162
163 #define BtId 6
164
165 //      Page key slot definition.
166
167 //      Keys are marked dead, but remain on the page until
168 //      it cleanup is called. The fence key (highest key) for
169 //      a leaf page is always present, even after cleanup.
170
171 //      Slot types
172
173 //      In addition to the Unique keys that occupy slots
174 //      there are Librarian and Duplicate key
175 //      slots occupying the key slot array.
176
177 //      The Librarian slots are dead keys that
178 //      serve as filler, available to add new Unique
179 //      or Dup slots that are inserted into the B-tree.
180
181 //      The Duplicate slots have had their key bytes extended
182 //      by 6 bytes to contain a binary duplicate key uniqueifier.
183
184 typedef enum {
185         Unique,
186         Update,
187         Librarian,
188         Duplicate,
189         Delete
190 } BtSlotType;
191
192 typedef struct {
193         uint off:BT_maxbits;    // page offset for key start
194         uint type:3;                    // type of slot
195         uint dead:1;                    // set for deleted slot
196 } BtSlot;
197
198 //      The key structure occupies space at the upper end of
199 //      each page.  It's a length byte followed by the key
200 //      bytes.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char key[0];
205 } BtKey;
206
207 //      the value structure also occupies space at the upper
208 //      end of the page. Each key is immediately followed by a value.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char value[0];
213 } BtVal;
214
215 #define BT_maxkey       255             // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217
218 //      The first part of an index page.
219 //      It is immediately followed
220 //      by the BtSlot array of keys.
221
222 typedef struct BtPage_ {
223         uint cnt;                                       // count of keys in page
224         uint act;                                       // count of active keys
225         uint min;                                       // next key/value offset
226         uint fence;                                     // page fence key offset
227         uint garbage;                           // page garbage in bytes
228         unsigned char lvl;                      // level of page, zero = leaf
229         unsigned char free;                     // page is on the free chain
230         unsigned char kill;                     // page is being deleted
231         unsigned char nopromote;        // page is being constructed
232         uid right, left;                        // page numbers to right and left
233         logseqno lsn;                           // log sequence number applied
234 } *BtPage;
235
236 //  The loadpage interface object
237
238 typedef struct {
239         BtPage page;            // current page pointer
240         BtLatchSet *latch;      // current page latch set
241 } BtPageSet;
242
243 //      structure for latch manager on ALLOC_page
244
245 typedef struct {
246         struct BtPage_ alloc[1];                // next page_no in right ptr
247         uid freechain;                                  // head of free page_nos chain
248         uid leafchain;                                  // head of leaf page_nos chain
249         unsigned long long leafpages;   // number of active leaf pages
250         unsigned long long upperpages;  // number of active upper pages
251         uint redopages;                                 // number of redo pages in file
252         unsigned char leaf_xtra;                // leaf page size in xtra bits
253         unsigned char page_bits;                // base page size in bits
254 } BtPageZero;
255
256 //      The object structure for Btree access
257
258 typedef struct {
259         uint page_size;                         // base page size       
260         uint page_bits;                         // base page size in bits       
261         uint leaf_xtra;                         // leaf xtra bits       
262 #ifdef unix
263         int idx;
264 #else
265         HANDLE idx;
266 #endif
267         BtPageZero *pagezero;           // mapped allocation page
268         BtHashEntry *hashtable;         // the buffer pool hash table entries
269         BtHashEntry *leaftable;         // the buffer pool hash table entries
270         BtLatchSet *latchsets;          // mapped latch set from buffer pool
271         BtLatchSet *leafsets;           // mapped latch set from buffer pool
272         unsigned char *pagepool;        // mapped to the buffer pool pages
273         unsigned char *leafpool;        // mapped to the leaf pool pages
274         unsigned char *redobuff;        // mapped recovery buffer pointer
275         logseqno lsn, flushlsn;         // current & first lsn flushed
276         MutexLatch redo[1];                     // redo area lite latch
277         MutexLatch lock[1];                     // allocation area lite latch
278         ushort thread_no[1];            // highest thread number issued
279         ushort err_thread;                      // error thread number
280         uint nlatchpage;                        // size of buffer pool & latchsets
281         uint latchtotal;                        // number of page latch entries
282         uint latchhash;                         // number of latch hash table slots
283         uint latchvictim;                       // next latch entry to swap out
284         uint nleafpage;                         // size of leaf pool & leafsets
285         uint leaftotal;                         // number of leaf latch entries
286         uint leafhash;                          // number of leaf hash table slots
287         uint leafvictim;                        // next leaf entry to swap out
288         uint leafpromote;                       // next leaf entry to promote
289         uint redopage;                          // page number of redo buffer
290         uint redolast;                          // last msync size of recovery buff
291         uint redoend;                           // eof/end element in recovery buff
292         int err;                                        // last error
293         int line;                                       // last error line no
294         int found;                                      // number of keys found by delete
295         int reads, writes;                      // number of reads and writes
296         int type;                                       // type of LSM tree 0=cache, 1=main
297 #ifndef unix
298         HANDLE halloc;                          // allocation handle
299         HANDLE hpool;                           // buffer pool handle
300 #endif
301         unsigned char *page0;           // memory mapped pagezero of b-tree
302 } BtMgr;
303
304 typedef struct {
305         BtMgr *mgr;                                     // buffer manager for entire process
306         BtMgr *main;                            // buffer manager for main btree
307         BtPageSet cacheset[1];  // cached page frame for cache btree
308         BtPageSet mainset[1];   // cached page frame for main btree
309         uint cacheslot;                         // slot number in cacheset
310         uint mainslot;                          // slot number in mainset
311         ushort phase;                           // 1 = main btree 0 = cache btree 2 = both
312         ushort thread_no;                       // thread number
313         BtSlot *cachenode;
314         BtSlot *mainnode;
315         BtKey *cachekey;
316         BtKey *mainkey;
317         BtVal *cacheval;
318         BtVal *mainval;
319 } BtDb;
320
321 //      atomic txn structure
322
323 typedef struct {
324         logseqno reqlsn;        // redo log seq no required
325         uint entry:31;          // latch table entry number
326         uint reuse:1;           // reused previous page
327         uint slot;                      // slot on page
328         uint src;                       // source slot
329 } AtomicTxn;
330
331 //      Catastrophic errors
332
333 typedef enum {
334         BTERR_ok = 0,
335         BTERR_struct,
336         BTERR_ovflw,
337         BTERR_lock,
338         BTERR_map,
339         BTERR_read,
340         BTERR_wrt,
341         BTERR_atomic,
342         BTERR_recovery
343 } BTERR;
344
345 #define CLOCK_bit 0x8000
346
347 // recovery manager entry types
348
349 typedef enum {
350         BTRM_eof = 0,   // rest of buffer is emtpy
351         BTRM_add,               // add a unique key-value to btree
352         BTRM_dup,               // add a duplicate key-value to btree
353         BTRM_del,               // delete a key-value from btree
354         BTRM_upd,               // update a key with a new value
355         BTRM_new,               // allocate a new empty page
356         BTRM_old                // reuse an old empty page
357 } BTRM;
358
359 // recovery manager entry
360 //      structure followed by BtKey & BtVal
361
362 typedef struct {
363         logseqno reqlsn;        // log sequence number required
364         logseqno lsn;           // log sequence number for entry
365         uint len;                       // length of entry
366         unsigned char type;     // type of entry
367         unsigned char lvl;      // level of btree entry pertains to
368 } BtLogHdr;
369
370 // B-Tree functions
371
372 extern void bt_close (BtDb *bt);
373 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
374 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
375 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
376 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
377 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
378 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
379 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
380
381 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
382
383 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
384 extern BTERR bt_nextkey (BtDb *bt);
385
386 extern uint bt_lastkey (BtDb *bt);
387 extern uint bt_prevkey (BtDb *bt);
388
389 //      manager functions
390 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
391 extern void bt_mgrclose (BtMgr *mgr);
392 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
393 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
394
395 //      atomic transaction functions
396 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no);
397 BTERR bt_promote (BtDb *bt);
398
399 //  The page is allocated from low and hi ends.
400 //  The key slots are allocated from the bottom,
401 //      while the text and value of the key
402 //  are allocated from the top.  When the two
403 //  areas meet, the page is split into two.
404
405 //  A key consists of a length byte, two bytes of
406 //  index number (0 - 65535), and up to 253 bytes
407 //  of key value.
408
409 //  Associated with each key is a value byte string
410 //      containing any value desired.
411
412 //  The b-tree root is always located at page 1.
413 //      The first leaf page of level zero is always
414 //      located on page 2.
415
416 //      The b-tree pages are linked with next
417 //      pointers to facilitate enumerators,
418 //      and provide for concurrency.
419
420 //      When to root page fills, it is split in two and
421 //      the tree height is raised by a new root at page
422 //      one with two keys.
423
424 //      Deleted keys are marked with a dead bit until
425 //      page cleanup. The fence key for a leaf node is
426 //      always present
427
428 //  To achieve maximum concurrency one page is locked at a time
429 //  as the tree is traversed to find leaf key in question. The right
430 //  page numbers are used in cases where the page is being split,
431 //      or consolidated.
432
433 //  Page 0 is dedicated to lock for new page extensions,
434 //      and chains empty pages together for reuse. It also
435 //      contains the latch manager hash table.
436
437 //      The ParentModification lock on a node is obtained to serialize posting
438 //      or changing the fence key for a node.
439
440 //      Empty pages are chained together through the ALLOC page and reused.
441
442 //      Access macros to address slot and key values from the page
443 //      Page slots use 1 based indexing.
444
445 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
446 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
447 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
448 #define fenceptr(page) ((BtKey*)((unsigned char*)(page) + page->fence))
449
450 void bt_putid(unsigned char *dest, uid id)
451 {
452 int i = BtId;
453
454         while( i-- )
455                 dest[i] = (unsigned char)id, id >>= 8;
456 }
457
458 uid bt_getid(unsigned char *src)
459 {
460 uid id = 0;
461 int i;
462
463         for( i = 0; i < BtId; i++ )
464                 id <<= 8, id |= *src++; 
465
466         return id;
467 }
468
469 //      lite weight spin lock Latch Manager
470
471 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
472 {
473         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
474 }
475
476 void bt_mutexlock(MutexLatch *latch)
477 {
478 uint idx, waited = 0;
479 MutexLatch prev[1];
480
481  while( 1 ) {
482   for( idx = 0; idx < 100; idx++ ) {
483         *prev->value = __sync_fetch_and_or (latch->value, 1);
484         if( !*prev->bits->xcl ) {
485           if( waited )
486                 __sync_fetch_and_sub (latch->bits->waiters, 1);
487           return;
488         }
489   }
490
491   if( !waited ) {
492         __sync_fetch_and_add (latch->bits->waiters, 1);
493         *prev->bits->waiters += 1;
494         waited++;
495   }
496
497   sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
498  }
499 }
500
501 int bt_mutextry(MutexLatch *latch)
502 {
503         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
504 }
505
506 void bt_releasemutex(MutexLatch *latch)
507 {
508 MutexLatch prev[1];
509
510         *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
511
512         if( *prev->bits->waiters )
513                 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
514 }
515
516 //      reader/writer lock implementation
517
518 void WriteLock (RWLock *lock, ushort tid, uint line)
519 {
520         if( lock->tid == tid ) {
521                 lock->dup++;
522                 return;
523         }
524         bt_mutexlock (lock->xcl);
525         bt_mutexlock (lock->wrt);
526         bt_releasemutex (lock->xcl);
527
528         lock->line = line;
529         lock->tid = tid;
530 }
531
532 void WriteRelease (RWLock *lock)
533 {
534         if( lock->dup ) {
535                 lock->dup--;
536                 return;
537         }
538         lock->tid = 0;
539         bt_releasemutex (lock->wrt);
540 }
541
542 void ReadLock (RWLock *lock, ushort tid)
543 {
544         bt_mutexlock (lock->xcl);
545
546         if( !__sync_fetch_and_add (&lock->readers, 1) )
547                 bt_mutexlock (lock->wrt);
548
549         bt_releasemutex (lock->xcl);
550 }
551
552 void ReadRelease (RWLock *lock)
553 {
554         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
555                 bt_releasemutex (lock->wrt);
556 }
557
558 //      recovery manager -- flush dirty pages
559
560 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
561 {
562 uint cnt3 = 0, cnt2 = 0, cnt = 0;
563 uint entry, segment;
564 BtLatchSet *latch;
565 BtPage page;
566
567   //    flush dirty pool pages to the btree
568
569 fprintf(stderr, "Start flushlsn  ");
570   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
571         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
572         latch = mgr->latchsets + entry;
573         bt_mutexlock (latch->modify);
574         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
575
576         if( latch->dirty ) {
577           bt_writepage(mgr, page, latch->page_no, 0);
578           latch->dirty = 0, cnt++;
579         }
580 if( latch->pin & ~CLOCK_bit )
581 cnt2++;
582         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
583         bt_releasemutex (latch->modify);
584   }
585   for( entry = 1; entry < mgr->leaftotal; entry++ ) {
586         page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
587         latch = mgr->leafsets + entry;
588         bt_mutexlock (latch->modify);
589         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
590
591         if( latch->dirty ) {
592           bt_writepage(mgr, page, latch->page_no, 1);
593           latch->dirty = 0, cnt++;
594         }
595 if( latch->pin & ~CLOCK_bit )
596 cnt2++;
597         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
598         bt_releasemutex (latch->modify);
599   }
600 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
601 }
602
603 //      recovery manager -- process current recovery buff on startup
604 //      this won't do much if previous session was properly closed.
605
606 BTERR bt_recoveryredo (BtMgr *mgr)
607 {
608 BtLogHdr *hdr, *eof;
609 uint offset = 0;
610 BtKey *key;
611 BtVal *val;
612
613   hdr = (BtLogHdr *)mgr->redobuff;
614   mgr->flushlsn = hdr->lsn;
615
616   while( 1 ) {
617         hdr = (BtLogHdr *)(mgr->redobuff + offset);
618         switch( hdr->type ) {
619         case BTRM_eof:
620                 mgr->lsn = hdr->lsn;
621                 return 0;
622         case BTRM_add:          // add a unique key-value to btree
623         
624         case BTRM_dup:          // add a duplicate key-value to btree
625         case BTRM_del:          // delete a key-value from btree
626         case BTRM_upd:          // update a key with a new value
627         case BTRM_new:          // allocate a new empty page
628         case BTRM_old:          // reuse an old empty page
629                 return 0;
630         }
631   }
632 }
633
634 //      recovery manager -- append new entry to recovery log
635 //      flush dirty pages to disk when it overflows.
636
637 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
638 {
639 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
640 uint amt = sizeof(BtLogHdr);
641 BtLogHdr *hdr, *eof;
642 uint last, end;
643
644         bt_mutexlock (mgr->redo);
645
646         if( key )
647           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
648
649         //      see if new entry fits in buffer
650         //      flush and reset if it doesn't
651
652         if( amt > size - mgr->redoend ) {
653           mgr->flushlsn = mgr->lsn;
654           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
655                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
656           mgr->redolast = 0;
657           mgr->redoend = 0;
658           bt_flushlsn(mgr, thread_no);
659         }
660
661         //      fill in new entry & either eof or end block
662
663         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
664
665         hdr->len = amt;
666         hdr->type = type;
667         hdr->lvl = lvl;
668         hdr->lsn = ++mgr->lsn;
669
670         mgr->redoend += amt;
671
672         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
673         memset (eof, 0, sizeof(BtLogHdr));
674
675         //  fill in key and value
676
677         if( key ) {
678           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
679           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
680         }
681
682         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
683         memset (eof, 0, sizeof(BtLogHdr));
684         eof->lsn = mgr->lsn;
685
686         last = mgr->redolast & ~0xfff;
687         end = mgr->redoend;
688
689         if( end - last + sizeof(BtLogHdr) >= 32768 )
690           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
691                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
692           else
693                 mgr->redolast = end;
694
695         bt_releasemutex(mgr->redo);
696         return hdr->lsn;
697 }
698
699 //      recovery manager -- append transaction to recovery log
700 //      flush dirty pages to disk when it overflows.
701
702 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
703 {
704 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
705 uint amt = 0, src, type;
706 BtLogHdr *hdr, *eof;
707 uint last, end;
708 logseqno lsn;
709 BtKey *key;
710 BtVal *val;
711
712         //      determine amount of redo recovery log space required
713
714         for( src = 0; src++ < source->cnt; ) {
715           key = keyptr(source,src);
716           val = valptr(source,src);
717           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
718           amt += sizeof(BtLogHdr);
719         }
720
721         bt_mutexlock (mgr->redo);
722
723         //      see if new entry fits in buffer
724         //      flush and reset if it doesn't
725
726         if( amt > size - mgr->redoend ) {
727           mgr->flushlsn = mgr->lsn;
728           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
729                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
730           mgr->redolast = 0;
731           mgr->redoend = 0;
732           bt_flushlsn (mgr, thread_no);
733         }
734
735         //      assign new lsn to transaction
736
737         lsn = ++mgr->lsn;
738
739         //      fill in new entries
740
741         for( src = 0; src++ < source->cnt; ) {
742           key = keyptr(source, src);
743           val = valptr(source, src);
744
745           switch( slotptr(source, src)->type ) {
746           case Unique:
747                 type = BTRM_add;
748                 break;
749           case Duplicate:
750                 type = BTRM_dup;
751                 break;
752           case Delete:
753                 type = BTRM_del;
754                 break;
755           }
756
757           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
758           amt += sizeof(BtLogHdr);
759
760           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
761           hdr->len = amt;
762           hdr->type = type;
763           hdr->lsn = lsn;
764           hdr->lvl = 0;
765
766           //  fill in key and value
767
768           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
769           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
770
771           mgr->redoend += amt;
772         }
773
774         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
775         memset (eof, 0, sizeof(BtLogHdr));
776         eof->lsn = lsn;
777
778         last = mgr->redolast & ~0xfff;
779         end = mgr->redoend;
780
781         if( end - last + sizeof(BtLogHdr) >= 32768 )
782           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
783                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
784           else
785                 mgr->redolast = end;
786
787         bt_releasemutex(mgr->redo);
788         return lsn;
789 }
790
791 //      read page into buffer pool from permanent location in Btree file
792
793 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
794 {
795 uint page_size = mgr->page_size;
796
797   if( leaf )
798         page_size <<= mgr->leaf_xtra;
799
800   if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
801         return mgr->err = BTERR_read;
802
803   __sync_fetch_and_add (&mgr->reads, 1);
804   return 0;
805 }
806
807 //      write page to permanent location in Btree file
808
809 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
810 {
811 uint page_size = mgr->page_size;
812
813   if( leaf )
814         page_size <<= mgr->leaf_xtra;
815
816   if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
817         return mgr->err = BTERR_wrt;
818
819   __sync_fetch_and_add (&mgr->writes, 1);
820   return 0;
821 }
822
823 //      set CLOCK bit in latch
824 //      decrement pin count
825
826 void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
827 {
828         bt_mutexlock(latch->modify);
829         if( dirty )
830                 latch->dirty = 1;
831         latch->pin |= CLOCK_bit;
832         latch->pin--;
833         bt_releasemutex(latch->modify);
834 }
835
836 //  return the btree cached page address
837
838 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
839 {
840 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
841 BtPage page;
842
843   if( latch->leaf )
844         page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
845   else
846         page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
847
848   return page;
849 }
850
851 //  return next available leaf entry
852 //        and with latch entry locked
853
854 uint bt_availleaf (BtMgr *mgr)
855 {
856 BtLatchSet *latch;
857 uint entry;
858
859   while( 1 ) {
860 #ifdef unix
861         entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
862 #else
863         entry = _InterlockedIncrement (&mgr->leafvictim);
864 #endif
865         entry %= mgr->leaftotal;
866
867         if( !entry )
868                 continue;
869
870         latch = mgr->leafsets + entry;
871
872         if( !bt_mutextry(latch->modify) )
873                 continue;
874
875         //  return this entry if it is not pinned
876
877         if( !latch->pin )
878                 return entry;
879
880         //      if the CLOCK bit is set
881         //      reset it to zero.
882
883         latch->pin &= ~CLOCK_bit;
884         bt_releasemutex(latch->modify);
885   }
886 }
887
888 //  return next available latch entry
889 //        and with latch entry locked
890
891 uint bt_availnext (BtMgr *mgr)
892 {
893 BtLatchSet *latch;
894 uint entry;
895
896   while( 1 ) {
897 #ifdef unix
898         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
899 #else
900         entry = _InterlockedIncrement (&mgr->latchvictim);
901 #endif
902         entry %= mgr->latchtotal;
903
904         if( !entry )
905                 continue;
906
907         latch = mgr->latchsets + entry;
908
909         if( !bt_mutextry(latch->modify) )
910                 continue;
911
912         //  return this entry if it is not pinned
913
914         if( !latch->pin )
915                 return entry;
916
917         //      if the CLOCK bit is set
918         //      reset it to zero.
919
920         latch->pin &= ~CLOCK_bit;
921         bt_releasemutex(latch->modify);
922   }
923 }
924
925 //      pin leaf in leaf buffer pool
926 //      return with latchset pinned
927
928 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
929 {
930 uint hashidx = page_no % mgr->leafhash;
931 uint entry, oldidx;
932 BtLatchSet *latch;
933 BtPage page;
934
935   //  try to find our entry
936
937   bt_mutexlock(mgr->leaftable[hashidx].latch);
938
939   if( entry = mgr->leaftable[hashidx].entry )
940    do {
941         latch = mgr->leafsets + entry;
942
943         if( page_no == latch->page_no )
944                 break;
945    } while( entry = latch->next );
946
947   //  found our entry: increment pin
948
949   if( entry ) {
950         latch = mgr->leafsets + entry;
951         bt_mutexlock(latch->modify);
952         latch->pin |= CLOCK_bit;
953         latch->pin++;
954
955         bt_releasemutex(latch->modify);
956         bt_releasemutex(mgr->leaftable[hashidx].latch);
957         return latch;
958   }
959
960   //  find and reuse unpinned entry
961
962 trynext:
963   entry = bt_availleaf (mgr);
964   latch = mgr->leafsets + entry;
965
966   page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
967   oldidx = latch->page_no % mgr->leafhash;
968
969   //  skip over this entry if old latch is not available
970
971   if( latch->page_no )
972    if( oldidx != hashidx )
973     if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
974           bt_releasemutex(latch->modify);
975           goto trynext;
976         }
977
978   //  update permanent page area in btree from buffer pool
979   //    no read-lock is required since page is not pinned.
980
981   if( latch->dirty )
982         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
983           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
984         else
985           latch->dirty = 0;
986
987   //  if latch is on a different hash chain
988   //    unlink from the old page_no chain
989
990   if( latch->page_no )
991    if( oldidx != hashidx ) {
992         if( latch->prev )
993           mgr->leafsets[latch->prev].next = latch->next;
994         else
995           mgr->leaftable[oldidx].entry = latch->next;
996
997         if( latch->next )
998           mgr->leafsets[latch->next].prev = latch->prev;
999
1000     bt_releasemutex (mgr->leaftable[oldidx].latch);
1001    }
1002
1003   if( bt_readpage (mgr, page, page_no, 1) )
1004         return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1005
1006   //  link page as head of hash table chain
1007   //  if this is a never before used entry,
1008   //  or it was previously on a different
1009   //  hash table chain. Otherwise, just
1010   //  leave it in its current hash table
1011   //  chain position.
1012
1013   if( !latch->page_no || hashidx != oldidx ) {
1014         if( latch->next = mgr->leaftable[hashidx].entry )
1015           mgr->leafsets[latch->next].prev = entry;
1016
1017         mgr->leaftable[hashidx].entry = entry;
1018     latch->prev = 0;
1019   }
1020
1021   //  fill in latch structure
1022
1023   latch->pin = CLOCK_bit | 1;
1024   latch->page_no = page_no;
1025   latch->leaf = 1;
1026
1027   bt_releasemutex (latch->modify);
1028   bt_releasemutex (mgr->leaftable[hashidx].latch);
1029   return latch;
1030 }
1031
1032 //      pin page in non-leaf buffer pool
1033 //      return with latchset pinned
1034
1035 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1036 {
1037 uint hashidx = page_no % mgr->latchhash;
1038 uint entry, oldidx;
1039 BtLatchSet *latch;
1040 BtPage page;
1041
1042   //  try to find our entry
1043
1044   bt_mutexlock(mgr->hashtable[hashidx].latch);
1045
1046   if( entry = mgr->hashtable[hashidx].entry ) do
1047   {
1048         latch = mgr->latchsets + entry;
1049
1050         if( page_no == latch->page_no )
1051                 break;
1052   } while( entry = latch->next );
1053
1054   //  found our entry: increment pin
1055
1056   if( entry ) {
1057         latch = mgr->latchsets + entry;
1058         bt_mutexlock(latch->modify);
1059         latch->pin |= CLOCK_bit;
1060         latch->pin++;
1061         bt_releasemutex(latch->modify);
1062         bt_releasemutex(mgr->hashtable[hashidx].latch);
1063         return latch;
1064   }
1065
1066   //  find and reuse unpinned entry
1067
1068 trynext:
1069   entry = bt_availnext (mgr);
1070   latch = mgr->latchsets + entry;
1071
1072   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1073   oldidx = latch->page_no % mgr->latchhash;
1074
1075   //  skip over this entry if latch not available
1076
1077   if( latch->page_no )
1078    if( oldidx != hashidx )
1079     if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
1080           bt_releasemutex(latch->modify);
1081           goto trynext;
1082         }
1083
1084   //  update permanent page area in btree from buffer pool
1085   //    no read-lock is required since page is not pinned.
1086
1087   if( latch->dirty )
1088         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1089           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1090         else
1091           latch->dirty = 0;
1092
1093   //  if latch is on a different hash chain
1094   //    unlink from the old page_no chain
1095
1096   if( latch->page_no )
1097    if( oldidx != hashidx ) {
1098         if( latch->prev )
1099           mgr->latchsets[latch->prev].next = latch->next;
1100         else
1101           mgr->hashtable[oldidx].entry = latch->next;
1102
1103         if( latch->next )
1104           mgr->latchsets[latch->next].prev = latch->prev;
1105
1106     bt_releasemutex (mgr->hashtable[oldidx].latch);
1107    }
1108
1109   if( bt_readpage (mgr, page, page_no, 0) )
1110         return mgr->line = __LINE__, NULL;
1111
1112   //  link page as head of hash table chain
1113   //  if this is a never before used entry,
1114   //  or it was previously on a different
1115   //  hash table chain. Otherwise, just
1116   //  leave it in its current hash table
1117   //  chain position.
1118
1119   if( !latch->page_no || hashidx != oldidx ) {
1120         if( latch->next = mgr->hashtable[hashidx].entry )
1121           mgr->latchsets[latch->next].prev = entry;
1122
1123         mgr->hashtable[hashidx].entry = entry;
1124     latch->prev = 0;
1125   }
1126
1127   //  fill in latch structure
1128
1129   latch->pin = CLOCK_bit | 1;
1130   latch->page_no = page_no;
1131   latch->leaf = 0;
1132
1133   bt_releasemutex (latch->modify);
1134   bt_releasemutex (mgr->hashtable[hashidx].latch);
1135   return latch;
1136 }
1137   
1138 void bt_mgrclose (BtMgr *mgr)
1139 {
1140 char *name = mgr->type ? "Main" : "Cache";
1141 BtLatchSet *latch;
1142 BtLogHdr *eof;
1143 uint num = 0;
1144 BtPage page;
1145 uint entry;
1146
1147         //      flush previously written dirty pages
1148         //      and write recovery buffer to disk
1149
1150         fdatasync (mgr->idx);
1151
1152         if( mgr->redoend ) {
1153                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1154                 memset (eof, 0, sizeof(BtLogHdr));
1155         }
1156
1157         //      write remaining dirty pool pages to the btree
1158
1159         for( entry = 1; entry < mgr->latchtotal; entry++ ) {
1160                 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1161                 latch = mgr->latchsets + entry;
1162
1163                 if( latch->dirty ) {
1164                         bt_writepage(mgr, page, latch->page_no, 0);
1165                         latch->dirty = 0, num++;
1166                 }
1167         }
1168
1169         if( num )
1170           fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
1171
1172         num = 0;
1173
1174         //      write remaining dirty leaf pages to the btree
1175
1176         for( entry = 1; entry < mgr->leaftotal; entry++ ) {
1177                 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1178                 latch = mgr->leafsets + entry;
1179
1180                 if( latch->dirty ) {
1181                         bt_writepage(mgr, page, latch->page_no, 1);
1182                         latch->dirty = 0, num++;
1183                 }
1184         }
1185
1186         if( num )
1187           fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
1188
1189         //      clear redo recovery buffer on disk.
1190
1191         if( mgr->pagezero->redopages ) {
1192                 eof = (BtLogHdr *)mgr->redobuff;
1193                 memset (eof, 0, sizeof(BtLogHdr));
1194                 eof->lsn = mgr->lsn;
1195                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1196                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1197         }
1198
1199 #ifdef unix
1200         munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
1201
1202         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1203         munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1204         munmap (mgr->pagezero, mgr->page_size);
1205 #else
1206         FlushViewOfFile(mgr->pagezero, 0);
1207         UnmapViewOfFile(mgr->pagezero);
1208         UnmapViewOfFile(mgr->pagepool);
1209         CloseHandle(mgr->halloc);
1210         CloseHandle(mgr->hpool);
1211 #endif
1212 #ifdef unix
1213         close (mgr->idx);
1214         free (mgr);
1215 #else
1216         FlushFileBuffers(mgr->idx);
1217         CloseHandle(mgr->idx);
1218         GlobalFree (mgr);
1219 #endif
1220 }
1221
1222 //      close and release memory
1223
1224 void bt_close (BtDb *bt)
1225 {
1226         free (bt);
1227 }
1228
1229 //  open/create new btree buffer manager
1230
1231 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1232 //              size of page pool (e.g. 300) and leaf pool (e.g. 4000)
1233
1234 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1235 {
1236 uint lvl, attr, last, slot, idx;
1237 uint nlatchpage, latchhash;
1238 uint nleafpage, leafhash;
1239 unsigned char value[BtId];
1240 int flag, initit = 0;
1241 BtPageZero *pagezero;
1242 BtLatchSet *latch;
1243 off64_t size;
1244 uint amt[1];
1245 BtMgr* mgr;
1246 BtKey* key;
1247 BtVal *val;
1248
1249         // determine sanity of page size and buffer pool
1250
1251         if( leafxtra + pagebits > BT_maxbits )
1252                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1253
1254         if( pagebits < BT_minbits )
1255                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1256
1257 #ifdef unix
1258         mgr = calloc (1, sizeof(BtMgr));
1259
1260         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1261
1262         if( mgr->idx == -1 ) {
1263                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1264                 return free(mgr), NULL;
1265         }
1266 #else
1267         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1268         attr = FILE_ATTRIBUTE_NORMAL;
1269         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1270
1271         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1272                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1273                 return GlobalFree(mgr), NULL;
1274         }
1275 #endif
1276
1277 #ifdef unix
1278         pagezero = valloc (BT_maxpage);
1279         *amt = 0;
1280
1281         // read minimum page size to get root info
1282         //      to support raw disk partition files
1283         //      check if page_bits == 0 on the disk.
1284
1285         if( size = lseek (mgr->idx, 0L, 2) )
1286                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1287                         if( pagezero->page_bits ) {
1288                                 pagebits = pagezero->page_bits;
1289                                 leafxtra = pagezero->leaf_xtra;
1290                                 redopages = pagezero->redopages;
1291                         } else
1292                                 initit = 1;
1293                 else
1294                         return free(mgr), free(pagezero), NULL;
1295         else
1296                 initit = 1;
1297 #else
1298         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1299         size = GetFileSize(mgr->idx, amt);
1300
1301         if( size || *amt ) {
1302                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1303                         return bt_mgrclose (mgr), NULL;
1304                 pagebits = pagezero->page_bits;
1305                 leafxtra = pagezero->leaf_xtra;
1306                 redopages = pagezero->redopages;
1307         } else
1308                 initit = 1;
1309 #endif
1310
1311         mgr->page_size = 1 << pagebits;
1312         mgr->page_bits = pagebits;
1313         mgr->leaf_xtra = leafxtra;
1314
1315         //  calculate number of latch hash table entries
1316
1317         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1318
1319         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1320         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1321         mgr->latchtotal = nodemax;
1322
1323         //  calculate number of leaf hash table entries
1324
1325         mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1326
1327         mgr->nleafpage += leafmax << leafxtra;          // size of the leaf pool in pages
1328         mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1329         mgr->leaftotal = leafmax;
1330
1331         mgr->redopage = LEAF_page + (1 << leafxtra);
1332
1333         if( !initit )
1334                 goto mgrlatch;
1335
1336         // initialize an empty b-tree with latch page, root page, page of leaves
1337         // and page(s) of latches and page pool cache
1338
1339         ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1340         memset (pagezero, 0, 1 << pagebits);
1341         pagezero->alloc->lvl = MIN_lvl - 1;
1342         pagezero->redopages = redopages;
1343         pagezero->page_bits = mgr->page_bits;
1344         pagezero->leaf_xtra = leafxtra;
1345
1346         pagezero->alloc->right = mgr->redopage + pagezero->redopages;
1347         pagezero->upperpages = 1;
1348         pagezero->leafpages = 1;
1349
1350         //  initialize left-most LEAF page in
1351         //      alloc->left and count of active leaf pages.
1352
1353         pagezero->alloc->left = LEAF_page;
1354
1355         if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1356                 fprintf (stderr, "Unable to create btree page zero\n");
1357                 return bt_mgrclose (mgr), NULL;
1358         }
1359
1360         memset (pagezero, 0, 1 << pagebits);
1361
1362         for( lvl=MIN_lvl; lvl--; ) {
1363         BtSlot *node = slotptr(pagezero->alloc, 1);
1364                 node->off = mgr->page_size;
1365
1366                 if( !lvl )
1367                         node->off <<= mgr->leaf_xtra;
1368
1369                 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1370                 node->type = Librarian;
1371                 node++->dead = 1;
1372
1373                 node->off = node[-1].off;
1374                 key = keyptr(pagezero->alloc, 2);
1375                 key = keyptr(pagezero->alloc, 1);
1376                 key->len = 2;           // create stopper key
1377                 key->key[0] = 0xff;
1378                 key->key[1] = 0xff;
1379
1380                 bt_putid(value, MIN_lvl - lvl + 1);
1381                 val = valptr(pagezero->alloc, 1);
1382                 val->len = lvl ? BtId : 0;
1383                 memcpy (val->value, value, val->len);
1384
1385                 pagezero->alloc->min = node->off;
1386                 pagezero->alloc->lvl = lvl;
1387                 pagezero->alloc->cnt = 2;
1388                 pagezero->alloc->act = 1;
1389
1390                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1391                         fprintf (stderr, "Unable to create btree page\n");
1392                         return bt_mgrclose (mgr), NULL;
1393                 }
1394         }
1395
1396 mgrlatch:
1397 #ifdef unix
1398         free (pagezero);
1399 #else
1400         VirtualFree (pagezero, 0, MEM_RELEASE);
1401 #endif
1402
1403         // mlock the first segment of 64K pages
1404
1405         flag = PROT_READ | PROT_WRITE;
1406         mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1407
1408         if( mgr->page0 == MAP_FAILED ) {
1409                 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
1410                 return bt_mgrclose (mgr), NULL;
1411         }
1412
1413         mgr->pagezero = (BtPageZero *)mgr->page0;
1414         mlock (mgr->pagezero, mgr->page_size);
1415
1416         mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
1417         mlock (mgr->redobuff, redopages << mgr->page_bits);
1418
1419         //      allocate pool buffers
1420
1421         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1422         if( mgr->pagepool == MAP_FAILED ) {
1423                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1424                 return bt_mgrclose (mgr), NULL;
1425         }
1426
1427         mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1428         if( mgr->leafpool == MAP_FAILED ) {
1429                 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1430                 return bt_mgrclose (mgr), NULL;
1431         }
1432
1433         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1434         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1435         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1436
1437         mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1438         mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1439         mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1440
1441         for( idx = 1; idx < mgr->leaftotal; idx++ ) {
1442                 latch = mgr->leafsets + idx;
1443                 latch->leaf = 1;
1444         }
1445
1446         return mgr;
1447 }
1448
1449 //      open BTree access method
1450 //      based on buffer manager
1451
1452 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1453 {
1454 BtDb *bt = malloc (sizeof(*bt));
1455
1456         memset (bt, 0, sizeof(*bt));
1457         bt->main = main;
1458         bt->mgr = mgr;
1459 #ifdef unix
1460         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1461 #else
1462         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1463 #endif
1464         return bt;
1465 }
1466
1467 //  compare two keys, return > 0, = 0, or < 0
1468 //  =0: keys are same
1469 //  -1: key2 > key1
1470 //  +1: key2 < key1
1471 //  as the comparison value
1472
1473 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1474 {
1475 uint len1 = key1->len;
1476 int ans;
1477
1478         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1479                 return ans;
1480
1481         if( len1 > len2 )
1482                 return 1;
1483         if( len1 < len2 )
1484                 return -1;
1485
1486         return 0;
1487 }
1488
1489 // place write, read, or parent lock on requested page_no.
1490
1491 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1492 {
1493         switch( mode ) {
1494         case BtLockRead:
1495                 ReadLock (latch->readwr, thread_no);
1496                 break;
1497         case BtLockWrite:
1498                 WriteLock (latch->readwr, thread_no, line);
1499                 break;
1500         case BtLockAccess:
1501                 ReadLock (latch->access, thread_no);
1502                 break;
1503         case BtLockDelete:
1504                 WriteLock (latch->access, thread_no, line);
1505                 break;
1506         case BtLockParent:
1507                 WriteLock (latch->parent, thread_no, line);
1508                 break;
1509         case BtLockLink:
1510                 WriteLock (latch->link, thread_no, line);
1511                 break;
1512         }
1513 }
1514
1515 // remove write, read, or parent lock on requested page
1516
1517 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1518 {
1519         switch( mode ) {
1520         case BtLockRead:
1521                 ReadRelease (latch->readwr);
1522                 break;
1523         case BtLockWrite:
1524                 WriteRelease (latch->readwr);
1525                 break;
1526         case BtLockAccess:
1527                 ReadRelease (latch->access);
1528                 break;
1529         case BtLockDelete:
1530                 WriteRelease (latch->access);
1531                 break;
1532         case BtLockParent:
1533                 WriteRelease (latch->parent);
1534                 break;
1535         case BtLockLink:
1536                 WriteRelease (latch->link);
1537                 break;
1538         }
1539 }
1540
1541 //      allocate a new page
1542 //      return with page latched, but unlocked.
1543
1544 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1545 {
1546 uint page_size = mgr->page_size, leaf_xtra = 0;
1547 uid *freechain;
1548 uid page_no;
1549
1550         //      lock allocation page
1551
1552         bt_mutexlock(mgr->lock);
1553
1554         if( contents->lvl ) {
1555                 freechain = &mgr->pagezero->freechain;
1556                 mgr->pagezero->upperpages++;
1557         } else {
1558                 freechain = &mgr->pagezero->leafchain;
1559                 mgr->pagezero->leafpages++;
1560                 leaf_xtra = mgr->leaf_xtra;
1561                 page_size <<= leaf_xtra;
1562         }
1563
1564         // use empty chain first
1565         // else allocate new page
1566
1567         if( page_no = *freechain ) {
1568                 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1569                         set->page = bt_mappage (mgr, set->latch);
1570                 else
1571                         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1572
1573                 *freechain = set->page->right;
1574
1575                 //  the page is currently nopromote and this
1576                 //  will keep bt_promote out.
1577
1578                 //      contents will replace this bit
1579                 //  and pin will keep bt_promote out
1580
1581                 contents->nopromote = 0;
1582
1583                 memcpy (set->page, contents, page_size);
1584
1585 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1586 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1587
1588                 bt_releasemutex(mgr->lock);
1589                 return 0;
1590         }
1591
1592         page_no = mgr->pagezero->alloc->right;
1593         mgr->pagezero->alloc->right += 1 << leaf_xtra;
1594
1595         // unlock allocation latch and
1596         //      extend file into new page.
1597
1598 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1599 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1600
1601         bt_releasemutex(mgr->lock);
1602
1603         //      keep bt_promote out of this page
1604         contents->nopromote = 1;
1605
1606         if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1607                 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1608
1609         if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1610                 set->page = bt_mappage (mgr, set->latch);
1611         else
1612                 return mgr->err_thread = thread_no, mgr->err;
1613
1614         // now pin will keep bt_promote out
1615
1616         set->page->nopromote = 0;
1617         return 0;
1618 }
1619
1620 //  find slot in page for given key at a given level
1621
1622 int bt_findslot (BtPage page, unsigned char *key, uint len)
1623 {
1624 uint diff, higher = page->cnt, low = 1, slot;
1625 uint good = 0;
1626
1627         //        make stopper key an infinite fence value
1628
1629         if( page->right )
1630                 higher++;
1631         else
1632                 good++;
1633
1634         //      low is the lowest candidate.
1635         //  loop ends when they meet
1636
1637         //  higher is already
1638         //      tested as .ge. the passed key.
1639
1640         while( diff = higher - low ) {
1641                 slot = low + ( diff >> 1 );
1642                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1643                         low = slot + 1;
1644                 else
1645                         higher = slot, good++;
1646         }
1647
1648         //      return zero if key is on right link page
1649
1650         return good ? higher : 0;
1651 }
1652
1653 //  find and load page at given level for given key
1654 //      leave page rd or wr locked as requested
1655
1656 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1657 {
1658 uid page_no = ROOT_page, prevpage_no = 0;
1659 uint drill = 0xff, slot;
1660 uint mode, prevmode;
1661 BtPageSet prev[1];
1662 BtVal *val;
1663 BtKey *ptr;
1664
1665   //  start at root of btree and drill down
1666
1667   do {
1668         if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1669           set->page = bt_mappage (mgr, set->latch);
1670         else
1671           return 0;
1672
1673         if( page_no > ROOT_page )
1674           bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1675
1676         //      release & unpin parent or left sibling page
1677
1678         if( prevpage_no ) {
1679           bt_unlockpage(prevmode, prev->latch, thread_no, __LINE__);
1680           bt_unpinlatch (prev->latch, 0, thread_no, __LINE__);
1681           prevpage_no = 0;
1682         }
1683
1684         // obtain mode lock using lock coupling through AccessLock
1685         // determine lock mode of drill level
1686
1687         mode = (drill == lvl) ? lock : BtLockRead; 
1688         bt_lockpage(mode, set->latch, thread_no, __LINE__);
1689
1690         // grab our fence key
1691
1692         ptr=fenceptr(set->page);
1693
1694         if( set->page->free )
1695                 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1696
1697         if( page_no > ROOT_page )
1698           bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1699
1700         // re-read and re-lock root after determining actual level of root
1701
1702         if( set->page->lvl != drill) {
1703                 if( set->latch->page_no != ROOT_page )
1704                         return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1705                         
1706                 drill = set->page->lvl;
1707
1708                 if( lock != BtLockRead && drill == lvl ) {
1709                   bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1710                   bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
1711                   continue;
1712                 }
1713         }
1714
1715         prevpage_no = set->latch->page_no;
1716         prevmode = mode;
1717         *prev = *set;
1718
1719         //  if requested key is beyond our fence,
1720         //      slide to the right
1721
1722         if( keycmp (ptr, key, len) < 0 )
1723           if( page_no = set->page->right )
1724                 continue;
1725
1726         //  if page is part of a delete operation,
1727         //      slide to the left;
1728
1729         if( set->page->kill ) {
1730           bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1731           page_no = set->page->left;
1732           bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1733           continue;
1734         }
1735
1736         //  find key on page at this level
1737         //  and descend to requested level
1738
1739         if( slot = bt_findslot (set->page, key, len) ) {
1740           if( drill == lvl )
1741                 return slot;
1742
1743           // find next non-dead slot -- the fence key if nothing else
1744
1745           while( slotptr(set->page, slot)->dead )
1746                 if( slot++ < set->page->cnt )
1747                   continue;
1748                 else
1749                   return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1750
1751           val = valptr(set->page, slot);
1752
1753           if( val->len == BtId )
1754                 page_no = bt_getid(val->value);
1755           else
1756                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1757
1758           drill--;
1759           continue;
1760          }
1761
1762         //  slide right into next page
1763
1764         page_no = set->page->right;
1765
1766   } while( page_no );
1767
1768   // return error on end of right chain
1769
1770   mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1771   return 0;     // return error
1772 }
1773
1774 //      return page to free list
1775 //      page must be delete, link & write locked
1776 //      and have no keys pointing to it.
1777
1778 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1779 {
1780 uid *freechain;
1781
1782         //      lock allocation page
1783
1784         bt_mutexlock (mgr->lock);
1785
1786         if( set->page->lvl ) {
1787                 freechain = &mgr->pagezero->freechain;
1788                 mgr->pagezero->upperpages--;
1789         } else {
1790                 freechain = &mgr->pagezero->leafchain;
1791                 mgr->pagezero->leafpages--;
1792         }
1793
1794         //      store chain link
1795
1796         set->page->right = *freechain;
1797         *freechain = set->latch->page_no;
1798         set->page->free = 1;
1799
1800 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1801 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1802
1803         // unlock released page
1804         // and unlock allocation page
1805
1806         bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1807         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1808         bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1809         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1810         bt_releasemutex (mgr->lock);
1811 }
1812
1813 //      a fence key was deleted from an interiour level page
1814 //      push new fence value upwards
1815
1816 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1817 {
1818 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1819 unsigned char value[BtId];
1820 BtKey* ptr;
1821 uint idx;
1822
1823         //      remove the old fence value
1824
1825         ptr = fenceptr(set->page);
1826         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1827         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1828         set->page->fence = slotptr(set->page, set->page->cnt)->off;
1829
1830         //  cache new fence value
1831
1832         ptr = fenceptr(set->page);
1833         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1834
1835         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1836         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1837
1838         //      insert new (now smaller) fence key
1839
1840         bt_putid (value, set->latch->page_no);
1841         ptr = (BtKey*)leftkey;
1842
1843         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1844           return mgr->err_thread = thread_no, mgr->err;
1845
1846         //      now delete old fence key
1847
1848         ptr = (BtKey*)rightkey;
1849
1850         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1851                 return mgr->err_thread = thread_no, mgr->err;
1852
1853         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1854         bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
1855         return 0;
1856 }
1857
1858 //      root has a single child
1859 //      collapse a level from the tree
1860
1861 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1862 {
1863 BtPageSet child[1];
1864 uid page_no;
1865 BtVal *val;
1866 uint idx;
1867
1868   // find the child entry and promote as new root contents
1869
1870   do {
1871         for( idx = 0; idx++ < root->page->cnt; )
1872           if( !slotptr(root->page, idx)->dead )
1873                 break;
1874
1875         val = valptr(root->page, idx);
1876
1877         if( val->len == BtId )
1878                 page_no = bt_getid (valptr(root->page, idx)->value);
1879         else
1880                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1881
1882         if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1883                 child->page = bt_mappage (mgr, child->latch);
1884         else
1885                 return mgr->err_thread = thread_no, mgr->err;
1886
1887         bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1888         bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1889
1890         memcpy (root->page, child->page, mgr->page_size);
1891         bt_freepage (mgr, child, thread_no);
1892
1893   } while( root->page->lvl > 1 && root->page->act == 1 );
1894
1895   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1896   bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
1897   return 0;
1898 }
1899
1900 //  delete a page and manage key
1901 //  call with page writelocked
1902
1903 //      returns with page unpinned
1904 //      from the page pool.
1905
1906 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1907 {
1908 unsigned char lowerfence[BT_keyarray];
1909 uint page_size = mgr->page_size, kill;
1910 BtPageSet right[1], temp[1];
1911 unsigned char value[BtId];
1912 uid page_no, right2;
1913 BtKey *ptr;
1914
1915         if( !lvl )
1916                 page_size <<= mgr->leaf_xtra;
1917
1918         //  cache original copy of original fence key
1919         //      that is going to be deleted.
1920
1921         ptr = fenceptr(set->page);
1922         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1923
1924         //      pin and lock our right page
1925
1926         page_no = set->page->right;
1927
1928         if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1929                 right->page = bt_mappage (mgr, right->latch);
1930         else
1931                 return 0;
1932
1933         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1934
1935         if( right->page->kill || set->page->kill )
1936                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1937
1938         // pull contents of right sibling over our empty page
1939         //      preserving our left page number, and its right page number.
1940
1941         bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
1942         page_no = set->page->left;
1943         memcpy (set->page, right->page, page_size);
1944         set->page->left = page_no;
1945         bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1946
1947         //  fix left link from far right page
1948
1949         if( right2 = set->page->right ) {
1950           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
1951                 temp->page = bt_mappage (mgr, temp->latch);
1952           else
1953                 return 0;
1954
1955           bt_lockpage (BtLockAccess, temp->latch, thread_no, __LINE__);
1956       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1957           temp->page->left = set->latch->page_no;
1958           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1959           bt_unlockpage(BtLockAccess, temp->latch, thread_no, __LINE__);
1960           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
1961         } else if( !lvl ) {     // our page is now rightmost leaf
1962           bt_mutexlock (mgr->lock);
1963           mgr->pagezero->alloc->left = set->latch->page_no;
1964           bt_releasemutex(mgr->lock);
1965         }
1966
1967         // mark right page as being deleted and release lock
1968
1969         right->page->kill = 1;
1970         bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1971
1972         // redirect the new higher key directly to our new node
1973
1974         ptr = fenceptr(set->page);
1975         bt_putid (value, set->latch->page_no);
1976
1977         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
1978           return mgr->err;
1979
1980         //      delete our original fence key in parent
1981
1982         ptr = (BtKey *)lowerfence;
1983
1984         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1985           return mgr->err;
1986
1987         //  wait for all access to drain away with delete lock,
1988         //      then obtain write lock to right node and free it.
1989
1990         bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
1991         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1992         bt_lockpage (BtLockLink, right->latch, thread_no, __LINE__);
1993         bt_freepage (mgr, right, thread_no);
1994
1995         //      release write lock to our node
1996
1997         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1998         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1999         return 0;
2000 }
2001
2002 //  find and delete key on page by marking delete flag bit
2003 //  if page becomes empty, delete it from the btree
2004
2005 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2006 {
2007 uint slot, idx, found, fence;
2008 BtPageSet set[1];
2009 BtSlot *node;
2010 BtKey *ptr;
2011 BtVal *val;
2012
2013         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2014                 node = slotptr(set->page, slot);
2015                 ptr = keyptr(set->page, slot);
2016         } else
2017                 return mgr->err_thread = thread_no, mgr->err;
2018
2019         // if librarian slot, advance to real slot
2020
2021         if( node->type == Librarian ) {
2022                 ptr = keyptr(set->page, ++slot);
2023                 node = slotptr(set->page, slot);
2024         }
2025
2026         fence = slot == set->page->cnt;
2027
2028         // delete the key, ignore request if already dead
2029
2030         if( found = !keycmp (ptr, key, len) )
2031           if( found = node->dead == 0 ) {
2032                 val = valptr(set->page,slot);
2033                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2034                 set->page->act--;
2035                 node->dead = 1;
2036
2037                 // collapse empty slots beneath the fence
2038                 // on interiour nodes
2039
2040                 if( lvl )
2041                  while( idx = set->page->cnt - 1 )
2042                   if( slotptr(set->page, idx)->dead ) {
2043                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2044                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2045                   } else
2046                         break;
2047           }
2048
2049         if( !found )
2050                 return 0;
2051
2052         //      did we delete a fence key in an upper level?
2053
2054         if( lvl && set->page->act && fence )
2055           return bt_fixfence (mgr, set, lvl, thread_no);
2056
2057         //      do we need to collapse root?
2058
2059         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
2060           return bt_collapseroot (mgr, set, thread_no);
2061
2062         //      delete empty page
2063
2064         if( !set->page->act )
2065           return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2066
2067         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2068         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2069         return 0;
2070 }
2071
2072 //      check page for space available,
2073 //      clean if necessary and return
2074 //      0 - page needs splitting
2075 //      >0  new slot value
2076
2077 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2078 {
2079 uint page_size = mgr->page_size;
2080 BtPage page = set->page, frame;
2081 uint cnt = 0, idx = 0;
2082 uint max = page->cnt;
2083 uint newslot = max;
2084 BtKey *key;
2085 BtVal *val;
2086
2087         if( !set->page->lvl )
2088                 page_size <<= mgr->leaf_xtra;
2089
2090         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2091                 return slot;
2092
2093         //      skip cleanup and proceed to split
2094         //      if there's not enough garbage
2095         //      to bother with.
2096
2097         if( page->garbage < page_size / 5 )
2098                 return 0;
2099
2100         frame = malloc (page_size);
2101         memcpy (frame, page, page_size);
2102
2103         // skip page info and set rest of page to zero
2104
2105         memset (page+1, 0, page_size - sizeof(*page));
2106
2107         page->min = page_size;
2108         page->garbage = 0;
2109         page->act = 0;
2110
2111         // clean up page first by
2112         // removing dead keys
2113
2114         while( cnt++ < max ) {
2115                 if( cnt == slot )
2116                         newslot = idx + 2;
2117
2118                 if( cnt < max || frame->lvl )
2119                   if( slotptr(frame,cnt)->dead )
2120                         continue;
2121
2122                 // copy the value across
2123
2124                 val = valptr(frame, cnt);
2125                 page->min -= val->len + sizeof(BtVal);
2126                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2127
2128                 // copy the key across
2129
2130                 key = keyptr(frame, cnt);
2131                 page->min -= key->len + sizeof(BtKey);
2132                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2133
2134                 // make a librarian slot
2135
2136                 slotptr(page, ++idx)->off = page->min;
2137                 slotptr(page, idx)->type = Librarian;
2138                 slotptr(page, idx)->dead = 1;
2139
2140                 // set up the slot
2141
2142                 slotptr(page, ++idx)->off = page->min;
2143                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2144
2145                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2146                   page->act++;
2147         }
2148
2149         page->fence = page->min;
2150         page->cnt = idx;
2151         free (frame);
2152
2153         //      see if page has enough space now, or does it need splitting?
2154
2155         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2156                 return newslot;
2157
2158         return 0;
2159 }
2160
2161 // split the root and raise the height of the btree
2162
2163 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2164 {  
2165 unsigned char leftkey[BT_keyarray];
2166 uint nxt = mgr->page_size;
2167 unsigned char value[BtId];
2168 BtPage frame, page;
2169 BtPageSet left[1];
2170 uid left_page_no;
2171 BtKey *ptr;
2172 BtVal *val;
2173
2174         frame = malloc (mgr->page_size);
2175         memcpy (frame, root->page, mgr->page_size);
2176
2177         //      save left page fence key for new root
2178
2179         ptr = fenceptr(root->page);
2180         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2181
2182         //  Obtain an empty page to use, and copy the current
2183         //  root contents into it, e.g. lower keys
2184
2185         if( bt_newpage(mgr, left, frame, thread_no) )
2186                 return mgr->err_thread = thread_no, mgr->err;
2187
2188         left_page_no = left->latch->page_no;
2189         bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
2190         free (frame);
2191
2192         //      left link the pages together
2193
2194         page = bt_mappage (mgr, right);
2195         page->left = left_page_no;
2196
2197         // preserve the page info at the bottom
2198         // of higher keys and set rest to zero
2199
2200         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2201
2202         // insert stopper key at top of newroot page
2203         // and increase the root height
2204
2205         nxt -= BtId + sizeof(BtVal);
2206         bt_putid (value, right->page_no);
2207         val = (BtVal *)((unsigned char *)root->page + nxt);
2208         memcpy (val->value, value, BtId);
2209         val->len = BtId;
2210
2211         nxt -= 2 + sizeof(BtKey);
2212         page->fence = nxt;
2213
2214         slotptr(root->page, 2)->off = nxt;
2215         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2216         ptr->len = 2;
2217         ptr->key[0] = 0xff;
2218         ptr->key[1] = 0xff;
2219
2220         // insert lower keys page fence key on newroot page as first key
2221
2222         nxt -= BtId + sizeof(BtVal);
2223         bt_putid (value, left_page_no);
2224         val = (BtVal *)((unsigned char *)root->page + nxt);
2225         memcpy (val->value, value, BtId);
2226         val->len = BtId;
2227
2228         ptr = (BtKey *)leftkey;
2229         nxt -= ptr->len + sizeof(BtKey);
2230         slotptr(root->page, 1)->off = nxt;
2231         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2232         
2233         root->page->right = 0;
2234         root->page->min = nxt;          // reset lowest used offset and key count
2235         root->page->cnt = 2;
2236         root->page->act = 2;
2237         root->page->lvl++;
2238
2239         mgr->pagezero->alloc->lvl = root->page->lvl;
2240
2241         // release and unpin root pages
2242
2243         bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2244         bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
2245
2246         bt_unpinlatch (right, 1, thread_no, __LINE__);
2247         return 0;
2248 }
2249
2250 //  split already locked full node
2251 //      leave it locked.
2252 //      return pool entry for new right
2253 //      page, pinned & unlocked
2254
2255 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2256 {
2257 uint page_size = mgr->page_size;
2258 BtPageSet right[1], temp[1];
2259 uint cnt = 0, idx = 0, max;
2260 uint lvl = set->page->lvl;
2261 BtPage frame;
2262 BtKey *key;
2263 BtVal *val;
2264 uid right2;
2265 uint entry;
2266 uint prev;
2267
2268         if( !set->page->lvl )
2269                 page_size <<= mgr->leaf_xtra;
2270
2271         //  split higher half of keys to frame
2272
2273         frame = malloc (page_size);
2274         memset (frame, 0, page_size);
2275         frame->min = page_size;
2276         max = set->page->cnt;
2277         cnt = max / 2;
2278         idx = 0;
2279
2280         while( cnt++ < max ) {
2281                 if( cnt < max || set->page->lvl )
2282                   if( slotptr(set->page, cnt)->dead )
2283                         continue;
2284
2285                 val = valptr(set->page, cnt);
2286                 frame->min -= val->len + sizeof(BtVal);
2287                 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2288
2289                 key = keyptr(set->page, cnt);
2290                 frame->min -= key->len + sizeof(BtKey);
2291                 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2292
2293                 //      add librarian slot
2294
2295                 slotptr(frame, ++idx)->off = frame->min;
2296                 slotptr(frame, idx)->type = Librarian;
2297                 slotptr(frame, idx)->dead = 1;
2298
2299                 //  add actual slot
2300
2301                 slotptr(frame, ++idx)->off = frame->min;
2302                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2303
2304                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2305                   frame->act++;
2306         }
2307
2308         frame->fence = frame->min;
2309         frame->cnt = idx;
2310         frame->lvl = lvl;
2311
2312         // link right node
2313
2314         if( set->latch->page_no > ROOT_page ) {
2315                 right2 = set->page->right;
2316                 frame->right = right2;
2317
2318                 if( linkleft )
2319                         frame->left = set->latch->page_no;
2320         }
2321
2322         //      get new free page and write higher keys to it.
2323
2324         if( bt_newpage(mgr, right, frame, thread_no) )
2325                 return 0;
2326
2327         //      link far right's left pointer to new page
2328
2329         if( linkleft && set->latch->page_no > ROOT_page )
2330          if( right2 ) {
2331           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2332                 temp->page = bt_mappage (mgr, temp->latch);
2333           else
2334                 return 0;
2335
2336       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2337           temp->page->left = right->latch->page_no;
2338           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2339           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2340          } else if( !lvl ) {    // page is rightmost leaf
2341           bt_mutexlock (mgr->lock);
2342           mgr->pagezero->alloc->left = right->latch->page_no;
2343           bt_releasemutex(mgr->lock);
2344          }
2345
2346         // process lower keys
2347
2348         memcpy (frame, set->page, page_size);
2349         memset (set->page+1, 0, page_size - sizeof(*set->page));
2350
2351         set->page->min = page_size;
2352         set->page->garbage = 0;
2353         set->page->act = 0;
2354         max /= 2;
2355         cnt = 0;
2356         idx = 0;
2357
2358         //  assemble page of smaller keys
2359
2360         while( cnt++ < max ) {
2361                 if( slotptr(frame, cnt)->dead )
2362                         continue;
2363                 val = valptr(frame, cnt);
2364                 set->page->min -= val->len + sizeof(BtVal);
2365                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2366
2367                 key = keyptr(frame, cnt);
2368                 set->page->min -= key->len + sizeof(BtKey);
2369                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2370
2371                 //      add librarian slot
2372
2373                 slotptr(set->page, ++idx)->off = set->page->min;
2374                 slotptr(set->page, idx)->type = Librarian;
2375                 slotptr(set->page, idx)->dead = 1;
2376
2377                 //      add actual slot
2378
2379                 slotptr(set->page, ++idx)->off = set->page->min;
2380                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2381                 set->page->act++;
2382         }
2383
2384         set->page->right = right->latch->page_no;
2385         set->page->fence = set->page->min;
2386         set->page->cnt = idx;
2387         free(frame);
2388
2389         entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2390         return entry;
2391 }
2392
2393 //      fix keys for newly split page
2394 //      call with both pages pinned & locked
2395 //  return unlocked and unpinned
2396
2397 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2398 {
2399 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2400 unsigned char value[BtId];
2401 uint lvl = set->page->lvl;
2402 BtPageSet temp[1];
2403 BtPage page;
2404 BtKey *ptr;
2405 uid right2;
2406
2407         // if current page is the root page, split it
2408
2409         if( set->latch->page_no == ROOT_page )
2410                 return bt_splitroot (mgr, set, right, thread_no);
2411
2412         ptr = fenceptr(set->page);
2413         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2414
2415         page = bt_mappage (mgr, right);
2416
2417         ptr = fenceptr(page);
2418         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2419
2420         //      splice in far right page's left page_no
2421
2422         if( right2 = page->right ) {
2423           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2424                 temp->page = bt_mappage (mgr, temp->latch);
2425           else
2426                 return 0;
2427
2428       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2429           temp->page->left = right->page_no;
2430           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2431           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2432         } else if( !lvl ) {  // right page is far right page
2433           bt_mutexlock (mgr->lock);
2434           mgr->pagezero->alloc->left = right->page_no;
2435           bt_releasemutex(mgr->lock);
2436         }
2437         // insert new fences in their parent pages
2438
2439         bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2440
2441         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2442         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2443
2444         // insert new fence for reformulated left block of smaller keys
2445
2446         bt_putid (value, set->latch->page_no);
2447         ptr = (BtKey *)leftkey;
2448
2449         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2450                 return mgr->err_thread = thread_no, mgr->err;
2451
2452         // switch fence for right block of larger keys to new right page
2453
2454         bt_putid (value, right->page_no);
2455         ptr = (BtKey *)rightkey;
2456
2457         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2458                 return mgr->err_thread = thread_no, mgr->err;
2459
2460         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2461         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2462
2463         bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2464         bt_unpinlatch (right, 1, thread_no, __LINE__);
2465         return 0;
2466 }
2467
2468 //      install new key and value onto page
2469 //      page must already be checked for
2470 //      adequate space
2471
2472 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2473 {
2474 uint idx, librarian;
2475 BtSlot *node;
2476 BtKey *ptr;
2477 BtVal *val;
2478 int rate;
2479
2480         //      if previous slot is a librarian slot, use it
2481
2482         if( slot > 1 )
2483           if( slotptr(set->page, slot-1)->type == Librarian )
2484                 slot--;
2485
2486         // copy value onto page
2487
2488         set->page->min -= vallen + sizeof(BtVal);
2489         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2490         memcpy (val->value, value, vallen);
2491         val->len = vallen;
2492
2493         // copy key onto page
2494
2495         set->page->min -= keylen + sizeof(BtKey);
2496         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2497         memcpy (ptr->key, key, keylen);
2498         ptr->len = keylen;
2499         
2500         //      find first empty slot at or above our insert slot
2501
2502         for( idx = slot; idx < set->page->cnt; idx++ )
2503           if( slotptr(set->page, idx)->dead )
2504                 break;
2505
2506         // now insert key into array before slot.
2507
2508         //      if we're going all the way to the top,
2509         //      add as many librarian slots as
2510         //      makes sense.
2511
2512         if( idx == set->page->cnt ) {
2513         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2514
2515                 librarian = ++idx - slot;
2516                 avail /= sizeof(BtSlot);
2517
2518                 if( avail < 0 )
2519                         avail = 0;
2520
2521                 if( librarian > avail )
2522                         librarian = avail;
2523
2524                 if( librarian ) {
2525                         rate = (idx - slot) / librarian;
2526                         set->page->cnt += librarian;
2527                         idx += librarian;
2528                 } else
2529                         rate = 0;
2530         } else
2531                 librarian = 0, rate = 0;
2532
2533         //  transfer slots and add librarian slots
2534
2535         while( idx > slot ) {
2536                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2537
2538                 //      add librarian slot per rate
2539
2540                 if( librarian )
2541                  if( (idx - slot)/2 <= librarian * rate ) {
2542                         node = slotptr(set->page, --idx);
2543                         node->off = node[1].off;
2544                         node->type = Librarian;
2545                         node->dead = 1;
2546                         librarian--;
2547                   }
2548
2549                 --idx;
2550         }
2551
2552         set->page->act++;
2553
2554         //      fill in new slot
2555
2556         node = slotptr(set->page, slot);
2557         node->off = set->page->min;
2558         node->type = type;
2559         node->dead = 0;
2560         return 0;
2561 }
2562
2563 //  Insert new key into the btree at given level.
2564 //      either add a new key or update/add an existing one
2565
2566 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2567 {
2568 uint slot, idx, len, entry;
2569 BtPageSet set[1];
2570 BtSlot *node;
2571 BtKey *ptr;
2572 BtVal *val;
2573
2574   while( 1 ) { // find the page and slot for the current key
2575         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2576                 node = slotptr(set->page, slot);
2577                 ptr = keyptr(set->page, slot);
2578         } else
2579                 return mgr->err;
2580
2581         // if librarian slot == found slot, advance to real slot
2582
2583         if( node->type == Librarian ) {
2584                 node = slotptr(set->page, ++slot);
2585                 ptr = keyptr(set->page, slot);
2586         }
2587
2588         //  if inserting a duplicate key or unique
2589         //      key that doesn't exist on the page,
2590         //      check for adequate space on the page
2591         //      and insert the new key before slot.
2592
2593         switch( type ) {
2594         case Unique:
2595         case Duplicate:
2596           if( !keycmp (ptr, key, keylen) )
2597                 break;
2598
2599           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2600             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2601                   return mgr->err;
2602                 else
2603                   goto insxit;
2604
2605           if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2606             if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2607                   continue;
2608
2609           return mgr->err_thread = thread_no, mgr->err;
2610
2611         case Update:
2612           if( keycmp (ptr, key, keylen) )
2613                 goto insxit;
2614
2615           break;
2616         }
2617
2618         // if key already exists, update value and return
2619
2620         val = valptr(set->page, slot);
2621
2622         if( val->len >= vallen ) {
2623           if( node->dead )
2624                 set->page->act++;
2625           node->type = type;
2626           node->dead = 0;
2627
2628           set->page->garbage += val->len - vallen;
2629           val->len = vallen;
2630           memcpy (val->value, value, vallen);
2631           goto insxit;
2632         }
2633
2634         //  new update value doesn't fit in existing value area
2635         //      make sure page has room
2636
2637         if( !node->dead )
2638           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2639         else
2640           set->page->act++;
2641
2642         node->type = type;
2643         node->dead = 0;
2644
2645         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2646           break;
2647
2648         if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2649           if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2650                 continue;
2651
2652         return mgr->err_thread = thread_no, mgr->err;
2653   }
2654
2655   //  copy key and value onto page and update slot
2656
2657   set->page->min -= vallen + sizeof(BtVal);
2658   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2659   memcpy (val->value, value, vallen);
2660   val->len = vallen;
2661
2662   set->page->min -= keylen + sizeof(BtKey);
2663   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2664   memcpy (ptr->key, key, keylen);
2665   ptr->len = keylen;
2666         
2667   slotptr(set->page,slot)->off = set->page->min;
2668
2669 insxit:
2670   bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2671   bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2672   return 0;
2673 }
2674
2675 //      determine actual page where key is located
2676 //  return slot number
2677
2678 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, BtPageSet *set)
2679 {
2680 BtKey *key = keyptr(source,locks[idx].src), *ptr;
2681 uint slot = locks[idx].slot;
2682 uint entry;
2683
2684         if( locks[idx].reuse )
2685           entry = locks[idx-1].entry;
2686         else
2687           entry = locks[idx].entry;
2688
2689         if( slot ) {
2690                 set->latch = mgr->leafsets + entry;
2691                 set->page = bt_mappage (mgr, set->latch);
2692                 return slot;
2693         }
2694
2695         //      find where our key is located 
2696         //      on current page or pages split on
2697         //      same page txn operations.
2698
2699         do {
2700                 set->latch = mgr->leafsets + entry;
2701                 set->page = bt_mappage (mgr, set->latch);
2702
2703                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2704                   if( slotptr(set->page, slot)->type == Librarian )
2705                         slot++;
2706                   if( locks[idx].reuse )
2707                         locks[idx].entry = entry;
2708                   return slot;
2709                 }
2710         } while( entry = set->latch->split );
2711
2712         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2713         return 0;
2714 }
2715
2716 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
2717 {
2718 BtKey *key = keyptr(source, locks[idx].src);
2719 BtVal *val = valptr(source, locks[idx].src);
2720 BtLatchSet *latch;
2721 BtPageSet set[1];
2722 uint entry, slot;
2723
2724   while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2725         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2726           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,locks[idx].src)->type) )
2727                 return mgr->err_thread = thread_no, mgr->err;
2728
2729           set->page->lsn = lsn;
2730           return 0;
2731         }
2732
2733         //  split page
2734
2735         if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2736           latch = mgr->leafsets + entry;
2737         else
2738           return mgr->err;
2739
2740         //      splice right page into split chain
2741         //      and WriteLock it
2742
2743         bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2744         latch->split = set->latch->split;
2745         set->latch->split = entry;
2746
2747         // clear slot number for atomic page
2748
2749         locks[idx].slot = 0;
2750   }
2751
2752   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2753 }
2754
2755 //      perform delete from smaller btree
2756 //  insert a delete slot if not found there
2757
2758 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, ushort thread_no, logseqno lsn)
2759 {
2760 BtKey *key = keyptr(source, locks[idx].src);
2761 BtLatchSet *latch;
2762 uint slot, entry;
2763 BtPageSet set[1];
2764 BtSlot *node;
2765 BtKey *ptr;
2766 BtVal *val;
2767
2768   while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2769         node = slotptr(set->page, slot);
2770         ptr = keyptr(set->page, slot);
2771         val = valptr(set->page, slot);
2772
2773         //  if slot is not found on cache btree, insert a delete slot
2774         //      otherwise ignore the request.
2775
2776         if( keycmp (ptr, key->key, key->len) )
2777          if( !mgr->type )
2778           if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
2779                 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
2780           else { //  split page before inserting Delete slot
2781                 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2782                   latch = mgr->leafsets + entry;
2783                 else
2784               return mgr->err;
2785
2786                 //      splice right page into split chain
2787                 //      and WriteLock it
2788
2789                 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2790                 latch->split = set->latch->split;
2791                 set->latch->split = entry;
2792
2793                 // clear slot number for atomic page
2794
2795                 locks[idx].slot = 0;
2796                 continue;
2797           }
2798          else
2799            return 0;
2800
2801         //      if node is already dead,
2802         //      ignore the request.
2803
2804         if( node->type == Delete || node->dead )
2805                 return 0;
2806
2807         //  if main LSM btree, delete the slot
2808         //      else change to delete type.
2809
2810         if( mgr->type ) {
2811                 set->page->act--;
2812                 node->dead = 1;
2813         } else
2814                 node->type = Delete;
2815
2816         __sync_fetch_and_add(&mgr->found, 1);
2817         set->page->lsn = lsn;
2818         return 0;
2819   }
2820
2821   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2822 }
2823
2824 //      release master's splits from right to left
2825
2826 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2827 {
2828 BtLatchSet *latch = mgr->leafsets + entry;
2829
2830         if( latch->split )
2831                 bt_atomicrelease (mgr, latch->split, thread_no);
2832
2833         latch->split = 0;
2834         bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2835         bt_unpinlatch(latch, 1, thread_no, __LINE__);
2836 }
2837
2838 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2839 {
2840 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2841 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2842
2843         return keycmp (key1, key2->key, key2->len);
2844 }
2845 //      atomic modification of a batch of keys.
2846
2847 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2848 {
2849 uint src, idx, slot, samepage, entry, que = 0;
2850 BtKey *key, *ptr, *key2;
2851 int result = 0;
2852 BtSlot temp[1];
2853 logseqno lsn;
2854 int type;
2855
2856   // stable sort the list of keys into order to
2857   //    prevent deadlocks between threads.
2858 /*
2859   for( src = 1; src++ < source->cnt; ) {
2860         *temp = *slotptr(source,src);
2861         key = keyptr (source,src);
2862
2863         for( idx = src; --idx; ) {
2864           key2 = keyptr (source,idx);
2865           if( keycmp (key, key2->key, key2->len) < 0 ) {
2866                 *slotptr(source,idx+1) = *slotptr(source,idx);
2867                 *slotptr(source,idx) = *temp;
2868           } else
2869                 break;
2870         }
2871   }
2872 */
2873         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2874   //  add entries to redo log
2875
2876   if( bt->mgr->pagezero->redopages )
2877         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2878   else
2879         lsn = 0;
2880
2881   //  perform the individual actions in the transaction
2882
2883   if( bt_atomicexec (bt->mgr, source, source->cnt, lsn, 0, bt->thread_no) )
2884         return bt->mgr->err;
2885
2886   // if number of active pages
2887   // is greater than the buffer pool
2888   // promote page into larger btree
2889
2890   if( bt->main )
2891    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2892         if( bt_promote (bt) )
2893           return bt->mgr->err;
2894
2895   // return success
2896
2897   return 0;
2898 }
2899
2900 //      execute the source list of inserts/deletes
2901
2902 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, logseqno lsn, int lsm, ushort thread_no)
2903 {
2904 uint slot, src, idx, samepage, entry, outidx;
2905 BtPageSet set[1], prev[1];
2906 unsigned char value[BtId];
2907 BtLatchSet *latch;
2908 uid right_page_no;
2909 AtomicTxn *locks;
2910 BtKey *key, *ptr;
2911 BtPage page;
2912 BtVal *val;
2913
2914   locks = calloc (count, sizeof(AtomicTxn));
2915   memset (set, 0, sizeof(BtPageSet));
2916   outidx = 0;
2917
2918   // Load the leaf page for each key
2919   // group same page references with reuse bit
2920
2921   for( src = 0; src++ < count; ) {
2922         if( slotptr(source,src)->dead )
2923           continue;
2924
2925         key = keyptr(source, src);
2926
2927         // first determine if this modification falls
2928         // on the same page as the previous modification
2929         //      note that the far right leaf page is a special case
2930
2931         if( samepage = !!set->page )
2932           samepage = !set->page->right || keycmp (ptr, key->key, key->len) >= 0;
2933
2934         if( !samepage )
2935           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2936                 ptr = fenceptr(set->page), set->latch->split = 0;
2937           else
2938                 return mgr->err;
2939         else
2940           slot = 0;
2941
2942         if( slot )
2943          if( slotptr(set->page, slot)->type == Librarian )
2944           slot++;
2945
2946         entry = set->latch - mgr->leafsets;
2947         locks[outidx].reuse = samepage;
2948         locks[outidx].entry = entry;
2949         locks[outidx].slot = slot;
2950         locks[outidx].src = src;
2951
2952         //      capture current lsn for master page
2953
2954         locks[outidx++].reqlsn = set->page->lsn;
2955   }
2956
2957   // insert or delete each key
2958   // process any splits or merges
2959   // run through txn list backwards
2960
2961   samepage = outidx;
2962
2963   for( src = outidx; src--; ) {
2964         if( locks[src].reuse )
2965           continue;
2966
2967         //  perform the txn operations
2968         //      from smaller to larger on
2969         //  the same page
2970
2971         for( idx = src; idx < samepage; idx++ )
2972          switch( slotptr(source,locks[idx].src)->type ) {
2973          case Delete:
2974           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2975                 return mgr->err;
2976           break;
2977
2978         case Duplicate:
2979         case Unique:
2980           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2981                 return mgr->err;
2982           break;
2983
2984         default:
2985           bt_atomicpage (mgr, source, locks, idx, set);
2986           break;
2987         }
2988
2989         //      after the same page operations have finished,
2990         //  process master page for splits or deletion.
2991
2992         latch = prev->latch = mgr->leafsets + locks[src].entry;
2993         prev->page = bt_mappage (mgr, prev->latch);
2994         samepage = src;
2995
2996         //  pick-up all splits from master page
2997         //      each one is already pinned & WriteLocked.
2998
2999         while( entry = prev->latch->split ) {
3000           set->latch = mgr->leafsets + entry;
3001           set->page = bt_mappage (mgr, set->latch);
3002
3003           // delete empty master page by undoing its split
3004           //  (this is potentially another empty page)
3005           //  note that there are no pointers to it yet
3006
3007           if( !prev->page->act ) {
3008                 set->page->left = prev->page->left;
3009                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3010                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3011                 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3012                 prev->latch->split = set->latch->split;
3013                 bt_freepage (mgr, set, thread_no);
3014                 continue;
3015           }
3016
3017           // remove empty split page from the split chain
3018           // and return it to the free list. No other
3019           // thread has its page number yet.
3020
3021           if( !set->page->act ) {
3022                 prev->page->right = set->page->right;
3023                 prev->latch->split = set->latch->split;
3024
3025                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3026                 bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3027                 bt_freepage (mgr, set, thread_no);
3028                 continue;
3029           }
3030
3031           //  update prev's fence key
3032
3033           ptr = fenceptr(prev->page);
3034           bt_putid (value, prev->latch->page_no);
3035
3036           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3037                 return mgr->err;
3038
3039           // splice in the left link into the split page
3040
3041           set->page->left = prev->latch->page_no;
3042           *prev = *set;
3043         }
3044
3045         //  update left pointer in next right page from last split page
3046         //      (if all splits were reversed or none occurred, latch->split == 0)
3047
3048         if( latch->split ) {
3049           //  fix left pointer in master's original (now split)
3050           //  far right sibling or set rightmost page in page zero
3051
3052           if( right_page_no = prev->page->right ) {
3053                 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3054                   set->page = bt_mappage (mgr, set->latch);
3055                 else
3056                   return mgr->err;
3057
3058             bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3059             set->page->left = prev->latch->page_no;
3060             bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3061                 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
3062           } else {      // prev is rightmost page
3063             bt_mutexlock (mgr->lock);
3064                 mgr->pagezero->alloc->left = prev->latch->page_no;
3065             bt_releasemutex(mgr->lock);
3066           }
3067
3068           //  switch the original fence key from the
3069           //  master page to the last split page.
3070
3071           ptr = fenceptr(prev->page);
3072           bt_putid (value, prev->latch->page_no);
3073
3074           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3075                 return mgr->err;
3076
3077           //  unlock and unpin the split pages
3078
3079           bt_atomicrelease (mgr, latch->split, thread_no);
3080
3081           //  unlock and unpin the master page
3082
3083           latch->split = 0;
3084           bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3085           bt_unpinlatch(latch, 1, thread_no, __LINE__);
3086           continue;
3087         }
3088
3089         //      since there are no splits remaining, we're
3090         //  finished if master page occupied
3091
3092         if( prev->page->act ) {
3093           bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3094           bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
3095           continue;
3096         }
3097
3098         // any and all splits were reversed, and the
3099         // master page located in prev is empty, delete it
3100
3101         if( bt_deletepage (mgr, prev, thread_no, 0) )
3102                 return mgr->err;
3103   }
3104
3105   // delete the slots
3106
3107   for( idx = 0; idx++ < count; ) {
3108         if( slotptr(source,idx)->dead )
3109           continue;
3110
3111         slotptr(source,idx)->dead = 1;
3112         source->act--;
3113   }
3114
3115   free (locks);
3116   return 0;
3117 }
3118
3119 //  pick & promote a page into the larger btree
3120
3121 BTERR bt_promote (BtDb *bt)
3122 {
3123 uint entry, slot, idx;
3124 BtPageSet set[1];
3125 BtSlot *node;
3126 BtKey *ptr;
3127 BtVal *val;
3128
3129   while( 1 ) {
3130 #ifdef unix
3131         entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3132 #else
3133         entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3134 #endif
3135         entry %= bt->mgr->leaftotal;
3136
3137         if( !entry )
3138                 continue;
3139
3140         set->latch = bt->mgr->leafsets + entry;
3141
3142         //  skip this entry if it has never been used
3143
3144         if( !set->latch->page_no )
3145           continue;
3146
3147         if( !bt_mutextry(set->latch->modify) )
3148                 continue;
3149
3150         //  skip this entry if it is pinned
3151
3152         if( set->latch->pin & ~CLOCK_bit ) {
3153           bt_releasemutex(set->latch->modify);
3154           continue;
3155         }
3156
3157         set->page = bt_mappage (bt->mgr, set->latch);
3158
3159         // entry has no right sibling
3160
3161         if( !set->page->right ) {
3162           bt_releasemutex(set->latch->modify);
3163           continue;
3164         }
3165
3166         // entry is being killed or constructed
3167
3168         if( set->page->nopromote || set->page->kill ) {
3169           bt_releasemutex(set->latch->modify);
3170           continue;
3171         }
3172
3173         //  pin the page for our access
3174         //      and leave it locked for the
3175         //      duration of the promotion.
3176
3177         set->latch->pin++;
3178         bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3179         bt_releasemutex(set->latch->modify);
3180
3181         // transfer slots in our selected page to the main btree
3182
3183 if( !(entry % 100) )
3184 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3185
3186         if( bt_atomicexec (bt->main, set->page, set->page->cnt, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3187                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3188                 return bt->main->err;
3189         }
3190
3191         //  now delete the page
3192
3193         if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3194                 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3195
3196         return bt->mgr->err;
3197   }
3198 }
3199
3200 //      find unique key == given key, or first duplicate key in
3201 //      leaf level and return number of value bytes
3202 //      or (-1) if not found.
3203
3204 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3205 {
3206 int ret = -1, type;
3207 BtPageSet set[1];
3208 BtSlot *node;
3209 BtKey *ptr;
3210 BtVal *val;
3211 uint slot;
3212
3213  for( type = 0; type < 2; type++ )
3214   if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3215         node = slotptr(set->page, slot);
3216
3217         //      skip librarian slot place holder
3218
3219         if( node->type == Librarian )
3220           node = slotptr(set->page, ++slot);
3221
3222         ptr = keyptr(set->page, slot);
3223
3224         //      not there if we reach the stopper key
3225         //  or the key doesn't match what's on the page.
3226
3227         if( slot == set->page->cnt )
3228           if( !set->page->right ) {
3229                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3230                 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3231                 continue;
3232         }
3233
3234         if( keycmp (ptr, key, keylen) ) {
3235                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3236                 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3237                 continue;
3238         }
3239
3240         // key matches, return >= 0 value bytes copied
3241         //      or -1 if not there.
3242
3243         if( node->type == Delete || node->dead ) {
3244                 ret = -1;
3245                 goto findxit;
3246         }
3247
3248         val = valptr (set->page,slot);
3249
3250         if( valmax > val->len )
3251                 valmax = val->len;
3252
3253         memcpy (value, val->value, valmax);
3254         ret = valmax;
3255         goto findxit;
3256   }
3257
3258   ret = -1;
3259
3260 findxit:
3261   if( type < 2 ) {
3262         bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3263         bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3264   }
3265   return ret;
3266 }
3267
3268 //      set cursor to highest slot on right-most page
3269
3270 BTERR bt_lastkey (BtDb *bt)
3271 {
3272 uid cache_page_no = bt->mgr->pagezero->alloc->left;
3273 uid main_page_no = bt->main->pagezero->alloc->left;
3274
3275         if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3276                 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3277         else
3278                 return bt->mgr->err;
3279
3280     bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3281         bt->cacheslot = bt->cacheset->page->cnt;
3282
3283         if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3284                 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3285         else
3286                 return bt->main->err;
3287
3288     bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3289         bt->mainslot = bt->mainset->page->cnt;
3290         bt->phase = 2;
3291         return 0;
3292 }
3293
3294 //      return previous slot on cursor page
3295
3296 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3297 {
3298 uid next, us = set->latch->page_no;
3299
3300   while( 1 ) {
3301         while( --slot )
3302           if( slotptr(set->page, slot)->dead )
3303                 continue;
3304           else
3305                 return slot;
3306
3307         next = set->page->left;
3308
3309         if( !next )
3310                 return 0;
3311
3312         do {
3313           bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3314           bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3315
3316           if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3317             set->page = bt_mappage (mgr, set->latch);
3318           else
3319             return 0;
3320
3321       bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3322           next = set->page->right;
3323
3324         } while( next != us );
3325
3326     slot = set->page->cnt + 1;
3327   }
3328 }
3329
3330 //  advance to previous key
3331
3332 BTERR bt_prevkey (BtDb *bt)
3333 {
3334 int cmp;
3335
3336   // first advance last key(s) one previous slot
3337
3338   while( 1 ) {
3339         switch( bt->phase ) {
3340         case 0:
3341           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3342           break;
3343         case 1:
3344           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3345           break;
3346         case 2:
3347           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3348           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3349         break;
3350         }
3351
3352   // return next key
3353
3354         if( bt->cacheslot ) {
3355           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3356           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3357           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3358         }
3359
3360         if( bt->mainslot ) {
3361           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3362           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3363           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3364         }
3365
3366         if( bt->mainslot && bt->cacheslot )
3367           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3368         else if( bt->cacheslot )
3369           cmp = 1;
3370         else if( bt->mainslot )
3371           cmp = -1;
3372         else
3373           return 0;
3374
3375         //  cache key is larger
3376
3377         if( cmp > 0 ) {
3378           bt->phase = 0;
3379           if( bt->cachenode->type == Delete )
3380                 continue;
3381           return bt->cacheslot;
3382         }
3383
3384         //  main key is larger
3385
3386         if( cmp < 0 ) {
3387           bt->phase = 1;
3388           return bt->mainslot;
3389         }
3390
3391         //      keys are equal
3392
3393         bt->phase = 2;
3394
3395         if( bt->cachenode->type == Delete )
3396                 continue;
3397
3398         return bt->cacheslot;
3399   }
3400 }
3401
3402 //      advance to next slot in cache or main btree
3403 //      return 0 for EOF/error
3404
3405 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3406 {
3407 BtPage page;
3408 uid page_no;
3409
3410   while( 1 ) {
3411         while( slot++ < set->page->cnt )
3412           if( slotptr(set->page, slot)->dead )
3413                 continue;
3414           else if( slot < set->page->cnt || set->page->right )
3415                 return slot;
3416           else
3417                 return 0;
3418
3419         bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3420         bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3421
3422         if( page_no = set->page->right )
3423           if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3424                 set->page = bt_mappage (mgr, set->latch);
3425           else
3426                 return 0;
3427         else
3428           return 0; // EOF
3429
3430         // obtain access lock using lock chaining with Access mode
3431
3432         bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3433         bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3434         bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3435         slot = 0;
3436   }
3437 }
3438
3439 //  advance to next key
3440
3441 BTERR bt_nextkey (BtDb *bt)
3442 {
3443 int cmp;
3444
3445   // first advance last key(s) one next slot
3446
3447   while( 1 ) {
3448         switch( bt->phase ) {
3449         case 0:
3450           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3451           break;
3452         case 1:
3453           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3454           break;
3455         case 2:
3456           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3457           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3458         break;
3459         }
3460
3461   // return next key
3462
3463         if( bt->cacheslot ) {
3464           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3465           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3466           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3467         }
3468
3469         if( bt->mainslot ) {
3470           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3471           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3472           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3473         }
3474
3475         if( bt->mainslot && bt->cacheslot )
3476           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3477         else if( bt->mainslot )
3478           cmp = 1;
3479         else if( bt->cacheslot )
3480           cmp = -1;
3481         else
3482           return 0;
3483
3484         //  main key is larger
3485         //      return smaller key
3486
3487         if( cmp < 0 ) {
3488           bt->phase = 0;
3489           if( bt->cachenode->type == Delete )
3490                 continue;
3491           return bt->cacheslot;
3492         }
3493
3494         //  cache key is larger
3495
3496         if( cmp > 0 ) {
3497           bt->phase = 1;
3498           return bt->mainslot;
3499         }
3500
3501         //      keys are equal
3502
3503         bt->phase = 2;
3504
3505         if( bt->cachenode->type == Delete )
3506                 continue;
3507
3508         return bt->cacheslot;
3509   }
3510 }
3511
3512 //  start sweep of keys
3513
3514 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3515 {
3516 BtPageSet set[1];
3517 uint slot;
3518
3519         // cache btree page
3520
3521         if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3522                 bt->cacheslot = slot - 1;
3523         else
3524           return bt->mgr->err;
3525
3526         // main btree page
3527
3528         if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3529                 bt->mainslot = slot - 1;
3530         else
3531           return bt->mgr->err;
3532
3533         bt->phase = 2;
3534         return 0;
3535 }
3536
3537 //      flush cache pages to main btree
3538
3539 BTERR bt_flushmain (BtDb *bt)
3540 {
3541 uint count, cnt = 0;
3542 BtPageSet set[1];
3543
3544   while( bt->mgr->pagezero->leafpages > 0 ) {
3545         if( set->latch = bt_pinleaf (bt->mgr, LEAF_page, bt->thread_no) )
3546           set->page = bt_mappage (bt->mgr, set->latch);
3547         else
3548           return bt->mgr->err;
3549
3550         bt_lockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
3551         count = set->page->cnt;
3552
3553         if( !set->page->right )
3554                 count--;
3555
3556 if( !(cnt++ % 100) )
3557 fprintf(stderr, "Promote LEAF_page %d with %d keys\n", cnt, set->page->act);
3558
3559         if( bt_atomicexec (bt->main, set->page, count, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) )
3560           return bt->mgr->line = bt->main->line, bt->mgr->err = bt->main->err;
3561
3562         if( set->page->right )
3563           if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3564                 return bt->mgr->err;
3565           else
3566                 continue;
3567
3568         bt_unlockpage(BtLockWrite, set->latch, bt->thread_no, __LINE__);
3569         bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3570         return 0;
3571   }
3572
3573   //  leaf page count is off
3574
3575   bt->mgr->err_thread = bt->thread_no, bt->mgr->line = __LINE__;
3576   return bt->mgr->err = BTERR_ovflw;
3577 }
3578
3579 #ifdef STANDALONE
3580
3581 #ifndef unix
3582 double getCpuTime(int type)
3583 {
3584 FILETIME crtime[1];
3585 FILETIME xittime[1];
3586 FILETIME systime[1];
3587 FILETIME usrtime[1];
3588 SYSTEMTIME timeconv[1];
3589 double ans = 0;
3590
3591         memset (timeconv, 0, sizeof(SYSTEMTIME));
3592
3593         switch( type ) {
3594         case 0:
3595                 GetSystemTimeAsFileTime (xittime);
3596                 FileTimeToSystemTime (xittime, timeconv);
3597                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3598                 break;
3599         case 1:
3600                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3601                 FileTimeToSystemTime (usrtime, timeconv);
3602                 break;
3603         case 2:
3604                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3605                 FileTimeToSystemTime (systime, timeconv);
3606                 break;
3607         }
3608
3609         ans += (double)timeconv->wHour * 3600;
3610         ans += (double)timeconv->wMinute * 60;
3611         ans += (double)timeconv->wSecond;
3612         ans += (double)timeconv->wMilliseconds / 1000;
3613         return ans;
3614 }
3615 #else
3616 #include <time.h>
3617 #include <sys/resource.h>
3618
3619 double getCpuTime(int type)
3620 {
3621 struct rusage used[1];
3622 struct timeval tv[1];
3623
3624         switch( type ) {
3625         case 0:
3626                 gettimeofday(tv, NULL);
3627                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3628
3629         case 1:
3630                 getrusage(RUSAGE_SELF, used);
3631                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3632
3633         case 2:
3634                 getrusage(RUSAGE_SELF, used);
3635                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3636         }
3637
3638         return 0;
3639 }
3640 #endif
3641
3642 void bt_poolaudit (BtMgr *mgr, char *type)
3643 {
3644 BtLatchSet *latch, test[1];
3645 uint entry;
3646
3647         memset (test, 0, sizeof(test));
3648
3649         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3650                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3651
3652         if( memcmp (test, mgr->leafsets, sizeof(test)) )
3653                 fprintf(stderr, "%s leafset zero overwritten\n", type);
3654
3655         for( entry = 0; ++entry < mgr->latchtotal; ) {
3656                 latch = mgr->latchsets + entry;
3657
3658                 if( latch->leaf )
3659                         fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
3660
3661                 if( *latch->modify->value )
3662                         fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3663
3664                 if( latch->pin & ~CLOCK_bit )
3665                         fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3666         }
3667
3668         for( entry = 0; ++entry < mgr->leaftotal; ) {
3669                 latch = mgr->leafsets + entry;
3670
3671                 if( !latch->leaf )
3672                         fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
3673
3674                 if( *latch->modify->value )
3675                         fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3676
3677                 if( latch->pin & ~CLOCK_bit )
3678                         fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3679         }
3680 }
3681
3682 typedef struct {
3683         char idx;
3684         char *type;
3685         char *infile;
3686         BtMgr *main;
3687         BtMgr *mgr;
3688         int num;
3689 } ThreadArg;
3690
3691 //  standalone program to index file of keys
3692 //  then list them onto std-out
3693
3694 #ifdef unix
3695 void *index_file (void *arg)
3696 #else
3697 uint __stdcall index_file (void *arg)
3698 #endif
3699 {
3700 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3701 uid next, page_no = LEAF_page;  // start on first page of leaves
3702 int ch, len = 0, slot, type = 0;
3703 unsigned char key[BT_maxkey];
3704 unsigned char buff[65536];
3705 uint nxt = sizeof(buff);
3706 ThreadArg *args = arg;
3707 uint counts[8][2];
3708 BtPageSet set[1];
3709 BtPage page;
3710 int vallen;
3711 BtKey *ptr;
3712 BtVal *val;
3713 uint size;
3714 BtDb *bt;
3715 FILE *in;
3716
3717         bt = bt_open (args->mgr, args->main);
3718         page = (BtPage)buff;
3719
3720         if( args->idx < strlen (args->type) )
3721                 ch = args->type[args->idx];
3722         else
3723                 ch = args->type[strlen(args->type) - 1];
3724
3725         switch(ch | 0x20)
3726         {
3727         case 'm':
3728           fprintf(stderr, "started flushing cache to main btree\n");
3729
3730           if( bt->main )
3731                 if( bt_flushmain(bt) )
3732                   fprintf(stderr, "Error %d Line: %d thread: %d\n", bt->mgr->err, bt->mgr->line, bt->thread_no), exit(0);
3733
3734           break;
3735
3736         case 'd':
3737                 type = Delete;
3738
3739         case 'p':
3740                 if( !type )
3741                         type = Unique;
3742
3743                 if( args->num )
3744                  if( type == Delete )
3745                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3746                  else
3747                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3748                 else
3749                  if( type == Delete )
3750                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3751                  else
3752                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3753
3754                 if( in = fopen (args->infile, "rb") )
3755                   while( ch = getc(in), ch != EOF )
3756                         if( ch == '\n' )
3757                         {
3758                           line++;
3759
3760                           if( !args->num ) {
3761                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3762                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3763                             len = 0;
3764                                 continue;
3765                           }
3766
3767                           nxt -= len - 10;
3768                           memcpy (buff + nxt, key + 10, len - 10);
3769                           nxt -= 1;
3770                           buff[nxt] = len - 10;
3771                           nxt -= 10;
3772                           memcpy (buff + nxt, key, 10);
3773                           nxt -= 1;
3774                           buff[nxt] = 10;
3775                           slotptr(page,++cnt)->off  = nxt;
3776                           slotptr(page,cnt)->type = type;
3777                           slotptr(page,cnt)->dead = 0;
3778                           len = 0;
3779
3780                           if( cnt < args->num )
3781                                 continue;
3782
3783                           page->cnt = cnt;
3784                           page->act = cnt;
3785                           page->min = nxt;
3786
3787                           if( bt_atomictxn (bt, page) )
3788                                 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3789                           nxt = sizeof(buff);
3790                           cnt = 0;
3791
3792                         }
3793                         else if( len < BT_maxkey )
3794                                 key[len++] = ch;
3795                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3796                 break;
3797
3798         case 'w':
3799                 fprintf(stderr, "started indexing for %s\n", args->infile);
3800                 if( in = fopen (args->infile, "r") )
3801                   while( ch = getc(in), ch != EOF )
3802                         if( ch == '\n' )
3803                         {
3804                           line++;
3805
3806                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3807                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3808                           len = 0;
3809                         }
3810                         else if( len < BT_maxkey )
3811                                 key[len++] = ch;
3812                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3813                 break;
3814
3815         case 'f':
3816                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3817                 if( in = fopen (args->infile, "rb") )
3818                   while( ch = getc(in), ch != EOF )
3819                         if( ch == '\n' )
3820                         {
3821                           line++;
3822                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3823                                 found++;
3824                           else if( bt->mgr->err )
3825                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3826                           len = 0;
3827                         }
3828                         else if( len < BT_maxkey )
3829                                 key[len++] = ch;
3830                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3831                 break;
3832
3833         case 's':
3834                 fprintf(stderr, "started forward scan\n");
3835                 if( bt_startkey (bt, NULL, 0) )
3836                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3837
3838                 while( bt_nextkey (bt) ) {
3839                   if( bt->phase == 1 ) {
3840                         len = bt->mainkey->len;
3841
3842                         if( bt->mainnode->type == Duplicate )
3843                                 len -= BtId;
3844
3845                         fwrite (bt->mainkey->key, len, 1, stdout);
3846                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3847                   } else {
3848                         len = bt->cachekey->len;
3849
3850                         if( bt->cachenode->type == Duplicate )
3851                                 len -= BtId;
3852
3853                         fwrite (bt->cachekey->key, len, 1, stdout);
3854                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3855                   }
3856
3857                   fputc ('\n', stdout);
3858                   cnt++;
3859             }
3860
3861                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3862                 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3863
3864                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3865                 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3866
3867                 fprintf(stderr, " Total keys read %d\n", cnt);
3868                 break;
3869
3870         case 'r':
3871                 fprintf(stderr, "started reverse scan\n");
3872                 if( bt_lastkey (bt) )
3873                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3874
3875                 while( bt_prevkey (bt) ) {
3876                   if( bt->phase == 1 ) {
3877                         len = bt->mainkey->len;
3878
3879                         if( bt->mainnode->type == Duplicate )
3880                                 len -= BtId;
3881
3882                         fwrite (bt->mainkey->key, len, 1, stdout);
3883                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3884                   } else {
3885                         len = bt->cachekey->len;
3886
3887                         if( bt->cachenode->type == Duplicate )
3888                                 len -= BtId;
3889
3890                         fwrite (bt->cachekey->key, len, 1, stdout);
3891                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3892                   }
3893
3894                   fputc ('\n', stdout);
3895                   cnt++;
3896             }
3897
3898                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3899                 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3900
3901                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3902                 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3903
3904                 fprintf(stderr, " Total keys read %d\n", cnt);
3905                 break;
3906
3907         case 'c':
3908                 fprintf(stderr, "started counting LSM cache btree\n");
3909                 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3910                 memset (counts, 0, sizeof(counts));
3911                 page_no = LEAF_page;
3912                 size = bt->mgr->page_size << bt->mgr->leaf_xtra;
3913                 page = malloc(size);
3914
3915 #ifdef unix
3916                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3917 #endif
3918                 while( page_no < bt->mgr->pagezero->alloc->right ) {
3919                   pread(bt->mgr->idx, page, size, page_no << bt->mgr->page_bits);
3920                   if( !page->lvl && !page->free ) {
3921                     cnt += page->act;
3922
3923                         for( idx = 0; idx++ < page->cnt; ) {
3924                         BtSlot *node = slotptr (page, idx);
3925                           counts[node->type][node->dead]++;
3926                         }
3927                   }
3928                   if( next )
3929                         page_no = next;
3930                   else
3931                         page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
3932                   next = 0;
3933                 }
3934                 
3935                 cachecnt = --cnt;       // remove stopper key
3936                 counts[Unique][0]--;
3937
3938                 fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
3939                 fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3940                 fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3941                 fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
3942                 fprintf(stderr, "total cache keys count: %d\n", cachecnt);
3943                 free (page);
3944
3945                 fprintf(stderr, "started counting LSM main btree\n");
3946                 next = bt->main->redopage + bt->main->pagezero->redopages;
3947                 memset (counts, 0, sizeof(counts));
3948                 size = bt->main->page_size << bt->main->leaf_xtra;
3949                 page = malloc(size);
3950                 page_no = LEAF_page;
3951                 cnt = 0;
3952
3953 #ifdef unix
3954                 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3955 #endif
3956                 while( page_no < bt->main->pagezero->alloc->right ) {
3957                   pread(bt->main->idx, page, size, page_no << bt->main->page_bits);
3958                   if( !page->lvl && !page->free ) {
3959                     cnt += page->act;
3960
3961                         for( idx = 0; idx++ < page->cnt; ) {
3962                         BtSlot *node = slotptr (page, idx);
3963                           counts[node->type][node->dead]++;
3964                         }
3965                   }
3966                   if( next )
3967                         page_no = next;
3968                   else
3969                         page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
3970                   next = 0;
3971                 }
3972                 
3973                 cnt--;  // remove stopper key
3974                 counts[Unique][0]--;
3975
3976                 fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
3977                 fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3978                 fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3979                 fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
3980                 fprintf(stderr, "total main keys count : %d\n", cnt);
3981                 fprintf(stderr, "Total keys counted    : %d\n", cnt + cachecnt);
3982                 free (page);
3983                 break;
3984         }
3985
3986         bt_close (bt);
3987 #ifdef unix
3988         return NULL;
3989 #else
3990         return 0;
3991 #endif
3992 }
3993
3994 typedef struct timeval timer;
3995
3996 int main (int argc, char **argv)
3997 {
3998 int idx, cnt, len, slot, err;
3999 double start, stop;
4000 #ifdef unix
4001 pthread_t *threads;
4002 #else
4003 HANDLE *threads;
4004 #endif
4005 ThreadArg *args;
4006 uint mainleafpool = 0;
4007 uint mainleafxtra = 0;
4008 uint redopages = 0;
4009 uint poolsize = 0;
4010 uint leafpool = 0;
4011 uint leafxtra = 0;
4012 uint mainpool = 0;
4013 uint mainbits = 0;
4014 int bits = 16;
4015 float elapsed;
4016 int num = 0;
4017 char key[1];
4018 BtMgr *main;
4019 BtMgr *mgr;
4020 BtKey *ptr;
4021
4022         if( argc < 3 ) {
4023                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
4024                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
4025                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
4026                 fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort/(c)ount/(m)ainflush, with a one character command for each input src_file. A command can also be given with no input file\n");
4027                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
4028                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
4029                 fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
4030                 fprintf (stderr, "  leafpool is the number of leaf pages in leaf pool for the cache btree\n");
4031                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
4032                 fprintf (stderr, "  redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
4033                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
4034                 fprintf (stderr, "  mainpool is the number of main pages in the main buffer pool\n");
4035                 fprintf (stderr, "  mainleaf is the number of main pages in the main leaf pool\n");
4036                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
4037                 exit(0);
4038         }
4039
4040         start = getCpuTime(0);
4041
4042         if( argc > 4 )
4043                 bits = atoi(argv[4]);
4044
4045         if( argc > 5 )
4046                 leafxtra = atoi(argv[5]);
4047
4048         if( argc > 6 )
4049                 poolsize = atoi(argv[6]);
4050
4051         if( !poolsize )
4052                 fprintf (stderr, "no page pool\n"), exit(1);
4053
4054         if( argc > 7 )
4055                 leafpool = atoi(argv[7]);
4056
4057         if( argc > 8 )
4058                 num = atoi(argv[8]);
4059
4060         if( argc > 9 )
4061                 redopages = atoi(argv[9]);
4062
4063         if( redopages > 65535 )
4064                 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
4065
4066         if( argc > 10 )
4067                 mainbits = atoi(argv[10]);
4068
4069         if( argc > 11 )
4070                 mainleafxtra = atoi(argv[11]);
4071
4072         if( argc > 12 )
4073                 mainpool = atoi(argv[12]);
4074
4075         if( argc > 13 )
4076                 mainleafpool = atoi(argv[13]);
4077
4078         cnt = argc - 14;
4079 #ifdef unix
4080         threads = malloc (cnt * sizeof(pthread_t));
4081 #else
4082         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
4083 #endif
4084         args = malloc ((cnt + 1) * sizeof(ThreadArg));
4085
4086         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
4087
4088         if( !mgr ) {
4089                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
4090                 exit (1);
4091         } else
4092                 mgr->type = 0;
4093
4094         if( mainbits ) {
4095                 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
4096
4097                 if( !main ) {
4098                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
4099                         exit (1);
4100                 } else
4101                         main->type = 1;
4102         } else
4103                 main = NULL;
4104
4105         //      fire off threads
4106
4107         if( cnt )
4108           for( idx = 0; idx < cnt; idx++ ) {
4109                 args[idx].infile = argv[idx + 14];
4110                 args[idx].type = argv[3];
4111                 args[idx].main = main;
4112                 args[idx].mgr = mgr;
4113                 args[idx].num = num;
4114                 args[idx].idx = idx;
4115 #ifdef unix
4116                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
4117                         fprintf(stderr, "Error creating thread %d\n", err);
4118 #else
4119                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
4120 #endif
4121           }
4122         else {
4123                 args[0].infile = argv[idx + 10];
4124                 args[0].type = argv[3];
4125                 args[0].main = main;
4126                 args[0].mgr = mgr;
4127                 args[0].num = num;
4128                 args[0].idx = 0;
4129                 index_file (args);
4130         }
4131
4132         //      wait for termination
4133
4134 #ifdef unix
4135         for( idx = 0; idx < cnt; idx++ )
4136                 pthread_join (threads[idx], NULL);
4137 #else
4138         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4139
4140         for( idx = 0; idx < cnt; idx++ )
4141                 CloseHandle(threads[idx]);
4142 #endif
4143         bt_poolaudit(mgr, "cache");
4144
4145         if( main )
4146                 bt_poolaudit(main, "main");
4147
4148         fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4149         if( main )
4150                 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4151
4152         bt_mgrclose (mgr);
4153
4154         if( main )
4155                 bt_mgrclose (main);
4156
4157         elapsed = getCpuTime(0) - start;
4158         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4159         elapsed = getCpuTime(1);
4160         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4161         elapsed = getCpuTime(2);
4162         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4163 }
4164
4165 BtKey *bt_fence (BtPage page)
4166 {
4167 return fenceptr(page);
4168 }
4169
4170 BtKey *bt_key (BtPage page, uint slot)
4171 {
4172 return keyptr(page,slot);
4173 }
4174
4175 BtSlot *bt_slot (BtPage page, uint slot)
4176 {
4177 return slotptr(page,slot);
4178 }
4179 #endif  //STANDALONE