]> pd.if.org Git - btree/blob - threadskv10g.c
Add finalized threadskv10g LSM btree.
[btree] / threadskv10g.c
1 // btree version threadskv10g futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      LSM B-trees for write optimization
11 //      larger sized leaf pages than non-leaf
12 //      and LSM B-tree find & count operations
13
14 // 28 OCT 2014
15
16 // author: karl malbrain, malbrain@cal.berkeley.edu
17
18 /*
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
21
22 The orginal author is Karl Malbrain.
23
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
30 */
31
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
34
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
37
38 #ifdef linux
39 #define _GNU_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
42 #define SYS_futex 202
43 #endif
44
45 #ifdef unix
46 #include <unistd.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <fcntl.h>
50 #include <sys/time.h>
51 #include <sys/mman.h>
52 #include <errno.h>
53 #include <pthread.h>
54 #include <limits.h>
55 #else
56 #define WIN32_LEAN_AND_MEAN
57 #include <windows.h>
58 #include <stdio.h>
59 #include <stdlib.h>
60 #include <time.h>
61 #include <fcntl.h>
62 #include <process.h>
63 #include <intrin.h>
64 #endif
65
66 #include <memory.h>
67 #include <string.h>
68 #include <stddef.h>
69
70 typedef unsigned long long      uid;
71 typedef unsigned long long      logseqno;
72
73 #ifndef unix
74 typedef unsigned long long      off64_t;
75 typedef unsigned short          ushort;
76 typedef unsigned int            uint;
77 #endif
78
79 #define BT_ro 0x6f72    // ro
80 #define BT_rw 0x7772    // rw
81
82 #define BT_maxbits              26                                      // maximum page size in bits
83 #define BT_minbits              9                                       // minimum page size in bits
84 #define BT_minpage              (1 << BT_minbits)       // minimum page size
85 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
86
87 //  BTree page number constants
88 #define ALLOC_page              0       // allocation page
89 #define ROOT_page               1       // root of the btree
90 #define LEAF_page               2       // first page of leaves
91
92 //      Number of levels to create in a new BTree
93
94 #define MIN_lvl                 2
95
96 /*
97 There are six lock types for each node in four independent sets: 
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
104 */
105
106 typedef enum{
107         BtLockAccess = 1,
108         BtLockDelete = 2,
109         BtLockRead   = 4,
110         BtLockWrite  = 8,
111         BtLockParent = 16,
112         BtLockLink   = 32
113 } BtLock;
114
115 typedef struct {
116   union {
117         struct {
118           volatile unsigned char xcl[1];
119           volatile unsigned char filler;
120           volatile ushort waiters[1];
121         } bits[1];
122         uint value[1];
123   };
124 } MutexLatch;
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   MutexLatch xcl[1];
130   MutexLatch wrt[1];
131   ushort readers;
132   ushort dup;           // re-entrant locks
133   ushort tid;           // owner thread-no
134   ushort line;          // owner line #
135 } RWLock;
136
137 //  hash table entries
138
139 typedef struct {
140         MutexLatch latch[1];
141         uint entry;             // Latch table entry at head of chain
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock link[1];                 // left link update in progress
152         MutexLatch modify[1];   // modify entry lite latch
153         uint split;                             // right split page atomic insert
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of accessing threads
157         unsigned char dirty;    // page in cache is dirty
158         unsigned char leaf;             // page in cache is a leaf
159 } BtLatchSet;
160
161 //      Define the length of the page record numbers
162
163 #define BtId 6
164
165 //      Page key slot definition.
166
167 //      Keys are marked dead, but remain on the page until
168 //      it cleanup is called. The fence key (highest key) for
169 //      a leaf page is always present, even after cleanup.
170
171 //      Slot types
172
173 //      In addition to the Unique keys that occupy slots
174 //      there are Librarian and Duplicate key
175 //      slots occupying the key slot array.
176
177 //      The Librarian slots are dead keys that
178 //      serve as filler, available to add new Unique
179 //      or Dup slots that are inserted into the B-tree.
180
181 //      The Duplicate slots have had their key bytes extended
182 //      by 6 bytes to contain a binary duplicate key uniqueifier.
183
184 typedef enum {
185         Unique,
186         Update,
187         Librarian,
188         Duplicate,
189         Delete
190 } BtSlotType;
191
192 typedef struct {
193         uint off:BT_maxbits;    // page offset for key start
194         uint type:3;                    // type of slot
195         uint dead:1;                    // set for deleted slot
196 } BtSlot;
197
198 //      The key structure occupies space at the upper end of
199 //      each page.  It's a length byte followed by the key
200 //      bytes.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char key[0];
205 } BtKey;
206
207 //      the value structure also occupies space at the upper
208 //      end of the page. Each key is immediately followed by a value.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char value[0];
213 } BtVal;
214
215 #define BT_maxkey       255             // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217
218 //      The first part of an index page.
219 //      It is immediately followed
220 //      by the BtSlot array of keys.
221
222 typedef struct BtPage_ {
223         uint cnt;                                       // count of keys in page
224         uint act;                                       // count of active keys
225         uint min;                                       // next key offset
226         uint garbage;                           // page garbage in bytes
227         unsigned char lvl;                      // level of page
228         unsigned char free;                     // page is on free chain
229         unsigned char kill;                     // page is being deleted
230         unsigned char nopromote;        // page is being constructed
231         unsigned char filler1[6];       // padding to multiple of 8 bytes
232         unsigned char right[BtId];      // page number to right
233         unsigned char left[BtId];       // page number to left
234         unsigned char filler2[2];       // padding to multiple of 8 bytes
235         logseqno lsn;                           // log sequence number applied
236         uid page_no;                            // this page number
237 } *BtPage;
238
239 //  The loadpage interface object
240
241 typedef struct {
242         BtPage page;            // current page pointer
243         BtLatchSet *latch;      // current page latch set
244 } BtPageSet;
245
246 //      structure for latch manager on ALLOC_page
247
248 typedef struct {
249         struct BtPage_ alloc[1];                // next page_no in right ptr
250         unsigned char freechain[BtId];  // head of free page_nos chain
251         unsigned char leafchain[BtId];  // head of leaf page_nos chain
252         unsigned long long leafpages;   // number of active leaf pages
253         unsigned long long upperpages;  // number of active upper pages
254         uint redopages;                                 // number of redo pages in file
255         unsigned char leaf_xtra;                // leaf page size in xtra bits
256         unsigned char page_bits;                // base page size in bits
257 } BtPageZero;
258
259 //      The object structure for Btree access
260
261 typedef struct {
262         uint page_size;                         // base page size       
263         uint page_bits;                         // base page size in bits       
264         uint leaf_xtra;                         // leaf xtra bits       
265 #ifdef unix
266         int idx;
267 #else
268         HANDLE idx;
269 #endif
270         BtPageZero *pagezero;           // mapped allocation page
271         BtHashEntry *hashtable;         // the buffer pool hash table entries
272         BtHashEntry *leaftable;         // the buffer pool hash table entries
273         BtLatchSet *latchsets;          // mapped latch set from buffer pool
274         BtLatchSet *leafsets;           // mapped latch set from buffer pool
275         unsigned char *pagepool;        // mapped to the buffer pool pages
276         unsigned char *leafpool;        // mapped to the leaf pool pages
277         unsigned char *redobuff;        // mapped recovery buffer pointer
278         logseqno lsn, flushlsn;         // current & first lsn flushed
279         MutexLatch redo[1];                     // redo area lite latch
280         MutexLatch lock[1];                     // allocation area lite latch
281         MutexLatch maps[1];                     // mapping segments lite latch
282         ushort thread_no[1];            // highest thread number issued
283         ushort err_thread;                      // error thread number
284         uint nlatchpage;                        // size of buffer pool & latchsets
285         uint latchtotal;                        // number of page latch entries
286         uint latchhash;                         // number of latch hash table slots
287         uint latchvictim;                       // next latch entry to examine
288         uint nleafpage;                         // size of leaf pool & leafsets
289         uint leaftotal;                         // number of leaf latch entries
290         uint leafhash;                          // number of leaf hash table slots
291         uint leafvictim;                        // next leaf entry to examine
292         uint leafpromote;                       // next leaf entry to promote
293         uint redopage;                          // page number of redo buffer
294         uint redolast;                          // last msync size of recovery buff
295         uint redoend;                           // eof/end element in recovery buff
296         int err;                                        // last error
297         int line;                                       // last error line no
298         int found;                                      // number of keys found by delete
299         int reads, writes;                      // number of reads and writes
300 #ifndef unix
301         HANDLE halloc;                          // allocation handle
302         HANDLE hpool;                           // buffer pool handle
303 #endif
304         volatile uint segments;         // number of memory mapped segments
305         unsigned char *pages[64000];// memory mapped segments of b-tree
306 } BtMgr;
307
308 typedef struct {
309         BtMgr *mgr;                                     // buffer manager for entire process
310         BtMgr *main;                            // buffer manager for main btree
311         BtPageSet cacheset[1];  // cached page frame for cache btree
312         BtPageSet mainset[1];   // cached page frame for main btree
313         uint cacheslot;                         // slot number in cacheset
314         uint mainslot;                          // slot number in mainset
315         ushort phase;                           // 1 = main btree 0 = cache btree 2 = both
316         ushort thread_no;                       // thread number
317         BtSlot *cachenode;
318         BtSlot *mainnode;
319         BtKey *cachekey;
320         BtKey *mainkey;
321         BtVal *cacheval;
322         BtVal *mainval;
323 } BtDb;
324
325 //      atomic txn structure
326
327 typedef struct {
328         logseqno reqlsn;        // redo log seq no required
329         uint entry:31;          // latch table entry number
330         uint reuse:1;           // reused previous page
331         uint slot;                      // slot on page
332 } AtomicTxn;
333
334 //      Catastrophic errors
335
336 typedef enum {
337         BTERR_ok = 0,
338         BTERR_struct,
339         BTERR_ovflw,
340         BTERR_lock,
341         BTERR_map,
342         BTERR_read,
343         BTERR_wrt,
344         BTERR_atomic,
345         BTERR_recovery
346 } BTERR;
347
348 #define CLOCK_bit 0x8000
349
350 // recovery manager entry types
351
352 typedef enum {
353         BTRM_eof = 0,   // rest of buffer is emtpy
354         BTRM_add,               // add a unique key-value to btree
355         BTRM_dup,               // add a duplicate key-value to btree
356         BTRM_del,               // delete a key-value from btree
357         BTRM_upd,               // update a key with a new value
358         BTRM_new,               // allocate a new empty page
359         BTRM_old                // reuse an old empty page
360 } BTRM;
361
362 // recovery manager entry
363 //      structure followed by BtKey & BtVal
364
365 typedef struct {
366         logseqno reqlsn;        // log sequence number required
367         logseqno lsn;           // log sequence number for entry
368         uint len;                       // length of entry
369         unsigned char type;     // type of entry
370         unsigned char lvl;      // level of btree entry pertains to
371 } BtLogHdr;
372
373 // B-Tree functions
374
375 extern void bt_close (BtDb *bt);
376 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
377 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
378 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
379 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
380 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
381 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
382 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
383
384 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
385
386 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
387 extern BTERR bt_nextkey (BtDb *bt);
388
389 extern uint bt_lastkey (BtDb *bt);
390 extern uint bt_prevkey (BtDb *bt);
391
392 //      manager functions
393 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
394 extern void bt_mgrclose (BtMgr *mgr);
395 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
396 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
397
398 //      atomic transaction functions
399 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
400 BTERR bt_promote (BtDb *bt);
401
402 //  The page is allocated from low and hi ends.
403 //  The key slots are allocated from the bottom,
404 //      while the text and value of the key
405 //  are allocated from the top.  When the two
406 //  areas meet, the page is split into two.
407
408 //  A key consists of a length byte, two bytes of
409 //  index number (0 - 65535), and up to 253 bytes
410 //  of key value.
411
412 //  Associated with each key is a value byte string
413 //      containing any value desired.
414
415 //  The b-tree root is always located at page 1.
416 //      The first leaf page of level zero is always
417 //      located on page 2.
418
419 //      The b-tree pages are linked with next
420 //      pointers to facilitate enumerators,
421 //      and provide for concurrency.
422
423 //      When to root page fills, it is split in two and
424 //      the tree height is raised by a new root at page
425 //      one with two keys.
426
427 //      Deleted keys are marked with a dead bit until
428 //      page cleanup. The fence key for a leaf node is
429 //      always present
430
431 //  To achieve maximum concurrency one page is locked at a time
432 //  as the tree is traversed to find leaf key in question. The right
433 //  page numbers are used in cases where the page is being split,
434 //      or consolidated.
435
436 //  Page 0 is dedicated to lock for new page extensions,
437 //      and chains empty pages together for reuse. It also
438 //      contains the latch manager hash table.
439
440 //      The ParentModification lock on a node is obtained to serialize posting
441 //      or changing the fence key for a node.
442
443 //      Empty pages are chained together through the ALLOC page and reused.
444
445 //      Access macros to address slot and key values from the page
446 //      Page slots use 1 based indexing.
447
448 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
449 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
450 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
451
452 void bt_putid(unsigned char *dest, uid id)
453 {
454 int i = BtId;
455
456         while( i-- )
457                 dest[i] = (unsigned char)id, id >>= 8;
458 }
459
460 uid bt_getid(unsigned char *src)
461 {
462 uid id = 0;
463 int i;
464
465         for( i = 0; i < BtId; i++ )
466                 id <<= 8, id |= *src++; 
467
468         return id;
469 }
470
471 //      lite weight spin lock Latch Manager
472
473 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
474 {
475         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
476 }
477
478 void bt_mutexlock(MutexLatch *latch)
479 {
480 uint idx, waited = 0;
481 MutexLatch prev[1];
482
483  while( 1 ) {
484   for( idx = 0; idx < 100; idx++ ) {
485         *prev->value = __sync_fetch_and_or (latch->value, 1);
486         if( !*prev->bits->xcl ) {
487           if( waited )
488                 __sync_fetch_and_sub (latch->bits->waiters, 1);
489           return;
490         }
491   }
492
493   if( !waited ) {
494         __sync_fetch_and_add (latch->bits->waiters, 1);
495         *prev->bits->waiters += 1;
496         waited++;
497   }
498
499   sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
500  }
501 }
502
503 int bt_mutextry(MutexLatch *latch)
504 {
505         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
506 }
507
508 void bt_releasemutex(MutexLatch *latch)
509 {
510 MutexLatch prev[1];
511
512         *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
513
514         if( *prev->bits->waiters )
515                 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
516 }
517
518 //      reader/writer lock implementation
519
520 void WriteLock (RWLock *lock, ushort tid, uint line)
521 {
522         if( lock->tid == tid ) {
523                 lock->dup++;
524                 return;
525         }
526         bt_mutexlock (lock->xcl);
527         bt_mutexlock (lock->wrt);
528         bt_releasemutex (lock->xcl);
529
530         lock->line = line;
531         lock->tid = tid;
532 }
533
534 void WriteRelease (RWLock *lock)
535 {
536         if( lock->dup ) {
537                 lock->dup--;
538                 return;
539         }
540         lock->tid = 0;
541         bt_releasemutex (lock->wrt);
542 }
543
544 void ReadLock (RWLock *lock, ushort tid)
545 {
546         bt_mutexlock (lock->xcl);
547
548         if( !__sync_fetch_and_add (&lock->readers, 1) )
549                 bt_mutexlock (lock->wrt);
550
551         bt_releasemutex (lock->xcl);
552 }
553
554 void ReadRelease (RWLock *lock)
555 {
556         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
557                 bt_releasemutex (lock->wrt);
558 }
559
560 //      recovery manager -- flush dirty pages
561
562 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
563 {
564 uint cnt3 = 0, cnt2 = 0, cnt = 0;
565 uint entry, segment;
566 BtLatchSet *latch;
567 BtPage page;
568
569   //    flush dirty pool pages to the btree
570
571 fprintf(stderr, "Start flushlsn  ");
572   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
573         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
574         latch = mgr->latchsets + entry;
575         bt_mutexlock (latch->modify);
576         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
577
578         if( latch->dirty ) {
579           bt_writepage(mgr, page, latch->page_no, 0);
580           latch->dirty = 0, cnt++;
581         }
582 if( latch->pin & ~CLOCK_bit )
583 cnt2++;
584         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
585         bt_releasemutex (latch->modify);
586   }
587   for( entry = 1; entry < mgr->leaftotal; entry++ ) {
588         page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
589         latch = mgr->leafsets + entry;
590         bt_mutexlock (latch->modify);
591         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
592
593         if( latch->dirty ) {
594           bt_writepage(mgr, page, latch->page_no, 1);
595           latch->dirty = 0, cnt++;
596         }
597 if( latch->pin & ~CLOCK_bit )
598 cnt2++;
599         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
600         bt_releasemutex (latch->modify);
601   }
602 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
603 fprintf(stderr, "begin sync");
604         for( segment = 0; segment < mgr->segments; segment++ )
605           if( msync (mgr->pages[segment], (uid)65536 << mgr->page_bits, MS_SYNC) < 0 )
606                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
607 fprintf(stderr, " end sync\n");
608 }
609
610 //      recovery manager -- process current recovery buff on startup
611 //      this won't do much if previous session was properly closed.
612
613 BTERR bt_recoveryredo (BtMgr *mgr)
614 {
615 BtLogHdr *hdr, *eof;
616 uint offset = 0;
617 BtKey *key;
618 BtVal *val;
619
620   hdr = (BtLogHdr *)mgr->redobuff;
621   mgr->flushlsn = hdr->lsn;
622
623   while( 1 ) {
624         hdr = (BtLogHdr *)(mgr->redobuff + offset);
625         switch( hdr->type ) {
626         case BTRM_eof:
627                 mgr->lsn = hdr->lsn;
628                 return 0;
629         case BTRM_add:          // add a unique key-value to btree
630         
631         case BTRM_dup:          // add a duplicate key-value to btree
632         case BTRM_del:          // delete a key-value from btree
633         case BTRM_upd:          // update a key with a new value
634         case BTRM_new:          // allocate a new empty page
635         case BTRM_old:          // reuse an old empty page
636                 return 0;
637         }
638   }
639 }
640
641 //      recovery manager -- append new entry to recovery log
642 //      flush dirty pages to disk when it overflows.
643
644 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
645 {
646 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
647 uint amt = sizeof(BtLogHdr);
648 BtLogHdr *hdr, *eof;
649 uint last, end;
650
651         bt_mutexlock (mgr->redo);
652
653         if( key )
654           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
655
656         //      see if new entry fits in buffer
657         //      flush and reset if it doesn't
658
659         if( amt > size - mgr->redoend ) {
660           mgr->flushlsn = mgr->lsn;
661           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
662                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
663           mgr->redolast = 0;
664           mgr->redoend = 0;
665           bt_flushlsn(mgr, thread_no);
666         }
667
668         //      fill in new entry & either eof or end block
669
670         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
671
672         hdr->len = amt;
673         hdr->type = type;
674         hdr->lvl = lvl;
675         hdr->lsn = ++mgr->lsn;
676
677         mgr->redoend += amt;
678
679         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
680         memset (eof, 0, sizeof(BtLogHdr));
681
682         //  fill in key and value
683
684         if( key ) {
685           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
686           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
687         }
688
689         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
690         memset (eof, 0, sizeof(BtLogHdr));
691         eof->lsn = mgr->lsn;
692
693         last = mgr->redolast & ~0xfff;
694         end = mgr->redoend;
695
696         if( end - last + sizeof(BtLogHdr) >= 32768 )
697           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
698                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
699           else
700                 mgr->redolast = end;
701
702         bt_releasemutex(mgr->redo);
703         return hdr->lsn;
704 }
705
706 //      recovery manager -- append transaction to recovery log
707 //      flush dirty pages to disk when it overflows.
708
709 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
710 {
711 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
712 uint amt = 0, src, type;
713 BtLogHdr *hdr, *eof;
714 uint last, end;
715 logseqno lsn;
716 BtKey *key;
717 BtVal *val;
718
719         //      determine amount of redo recovery log space required
720
721         for( src = 0; src++ < source->cnt; ) {
722           key = keyptr(source,src);
723           val = valptr(source,src);
724           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
725           amt += sizeof(BtLogHdr);
726         }
727
728         bt_mutexlock (mgr->redo);
729
730         //      see if new entry fits in buffer
731         //      flush and reset if it doesn't
732
733         if( amt > size - mgr->redoend ) {
734           mgr->flushlsn = mgr->lsn;
735           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
736                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
737           mgr->redolast = 0;
738           mgr->redoend = 0;
739           bt_flushlsn (mgr, thread_no);
740         }
741
742         //      assign new lsn to transaction
743
744         lsn = ++mgr->lsn;
745
746         //      fill in new entries
747
748         for( src = 0; src++ < source->cnt; ) {
749           key = keyptr(source, src);
750           val = valptr(source, src);
751
752           switch( slotptr(source, src)->type ) {
753           case Unique:
754                 type = BTRM_add;
755                 break;
756           case Duplicate:
757                 type = BTRM_dup;
758                 break;
759           case Delete:
760                 type = BTRM_del;
761                 break;
762           }
763
764           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
765           amt += sizeof(BtLogHdr);
766
767           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
768           hdr->len = amt;
769           hdr->type = type;
770           hdr->lsn = lsn;
771           hdr->lvl = 0;
772
773           //  fill in key and value
774
775           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
776           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
777
778           mgr->redoend += amt;
779         }
780
781         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
782         memset (eof, 0, sizeof(BtLogHdr));
783         eof->lsn = lsn;
784
785         last = mgr->redolast & ~0xfff;
786         end = mgr->redoend;
787
788         if( end - last + sizeof(BtLogHdr) >= 32768 )
789           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
790                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
791           else
792                 mgr->redolast = end;
793
794         bt_releasemutex(mgr->redo);
795         return lsn;
796 }
797
798 //      sync a single btree page to disk
799
800 BTERR bt_syncpage (BtMgr *mgr, BtPage page, BtLatchSet *latch)
801 {
802 uint segment = latch->page_no >> 16;
803 uint page_size = mgr->page_size;
804 BtPage perm;
805
806         if( bt_writepage (mgr, page, latch->page_no, latch->leaf) )
807                 return mgr->err;
808
809         if( !page->lvl )
810                 page_size <<= mgr->leaf_xtra;
811
812         perm = (BtPage)(mgr->pages[segment] + ((latch->page_no & 0xffff) << mgr->page_bits));
813
814         if( msync (perm, page_size, MS_SYNC) < 0 )
815           fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
816
817         latch->dirty = 0;
818         return 0;
819 }
820
821 //      read page into buffer pool from permanent location in Btree file
822
823 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
824 {
825 int flag = PROT_READ | PROT_WRITE;
826 uint page_size = mgr->page_size;
827 uint off = 0, segment, fragment;
828 unsigned char *perm;
829
830   if( leaf )
831         page_size <<= mgr->leaf_xtra;
832
833   fragment = page_no & 0xffff;
834   segment = page_no >> 16;
835   mgr->reads++;
836
837   while( off < page_size ) {
838         if( fragment >> 16 )
839                 segment++, fragment = 0;
840
841         if( segment < mgr->segments ) {
842           perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
843
844           memcpy ((unsigned char *)page + off, perm, mgr->page_size);
845           off += mgr->page_size;
846           fragment++;
847           continue;
848         }
849
850         bt_mutexlock (mgr->maps);
851
852         if( segment < mgr->segments ) {
853                 bt_releasemutex (mgr->maps);
854                 continue;
855         }
856
857         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
858         mgr->segments++;
859
860         bt_releasemutex (mgr->maps);
861   }
862
863   return 0;
864 }
865
866 //      write page to permanent location in Btree file
867
868 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
869 {
870 int flag = PROT_READ | PROT_WRITE;
871 uint page_size = mgr->page_size;
872 uint off = 0, segment, fragment;
873 unsigned char *perm;
874
875   if( leaf )
876         page_size <<= mgr->leaf_xtra;
877
878   fragment = page_no & 0xffff;
879   segment = page_no >> 16;
880   mgr->writes++;
881
882   while( off < page_size ) {
883         if( fragment >> 16 )
884                 segment++, fragment = 0;
885
886         if( segment < mgr->segments ) {
887           perm = mgr->pages[segment] + ((uid)fragment << mgr->page_bits);
888           memcpy (perm, (unsigned char *)page + off, mgr->page_size);
889           off += mgr->page_size;
890           fragment++;
891           continue;
892         }
893
894         bt_mutexlock (mgr->maps);
895
896         if( segment < mgr->segments ) {
897                 bt_releasemutex (mgr->maps);
898                 continue;
899         }
900
901         mgr->pages[mgr->segments] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << (mgr->page_bits + 16));
902         mgr->segments++;
903         bt_releasemutex (mgr->maps);
904   }
905
906   return 0;
907 }
908
909 //      set CLOCK bit in latch
910 //      decrement pin count
911
912 int value;
913
914 void bt_unpinlatch (BtLatchSet *latch, ushort thread_no, uint line)
915 {
916         bt_mutexlock(latch->modify);
917         latch->pin |= CLOCK_bit;
918         latch->pin--;
919
920         bt_releasemutex(latch->modify);
921 }
922
923 //  return the btree cached page address
924
925 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
926 {
927 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
928 BtPage page;
929
930   if( latch->leaf )
931         page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
932   else
933         page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
934
935   return page;
936 }
937
938 //  return next available leaf entry
939 //        and with latch entry locked
940
941 uint bt_availleaf (BtMgr *mgr)
942 {
943 BtLatchSet *latch;
944 uint entry;
945
946   while( 1 ) {
947 #ifdef unix
948         entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
949 #else
950         entry = _InterlockedIncrement (&mgr->leafvictim);
951 #endif
952         entry %= mgr->leaftotal;
953
954         if( !entry )
955                 continue;
956
957         latch = mgr->leafsets + entry;
958
959         if( !bt_mutextry(latch->modify) )
960                 continue;
961
962         //  return this entry if it is not pinned
963
964         if( !latch->pin )
965                 return entry;
966
967         //      if the CLOCK bit is set
968         //      reset it to zero.
969
970         latch->pin &= ~CLOCK_bit;
971         bt_releasemutex(latch->modify);
972   }
973 }
974
975 //  return next available latch entry
976 //        and with latch entry locked
977
978 uint bt_availnext (BtMgr *mgr)
979 {
980 BtLatchSet *latch;
981 uint entry;
982
983   while( 1 ) {
984 #ifdef unix
985         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
986 #else
987         entry = _InterlockedIncrement (&mgr->latchvictim);
988 #endif
989         entry %= mgr->latchtotal;
990
991         if( !entry )
992                 continue;
993
994         latch = mgr->latchsets + entry;
995
996         if( !bt_mutextry(latch->modify) )
997                 continue;
998
999         //  return this entry if it is not pinned
1000
1001         if( !latch->pin )
1002                 return entry;
1003
1004         //      if the CLOCK bit is set
1005         //      reset it to zero.
1006
1007         latch->pin &= ~CLOCK_bit;
1008         bt_releasemutex(latch->modify);
1009   }
1010 }
1011
1012 //      pin leaf in leaf buffer pool
1013 //      return with latchset pinned
1014
1015 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
1016 {
1017 uint hashidx = page_no % mgr->leafhash;
1018 BtLatchSet *latch;
1019 uint entry, idx;
1020 BtPage page;
1021
1022   //  try to find our entry
1023
1024   bt_mutexlock(mgr->leaftable[hashidx].latch);
1025
1026   if( entry = mgr->leaftable[hashidx].entry )
1027    do {
1028         latch = mgr->leafsets + entry;
1029
1030         if( page_no == latch->page_no )
1031                 break;
1032    } while( entry = latch->next );
1033
1034   //  found our entry: increment pin
1035
1036   if( entry ) {
1037         latch = mgr->leafsets + entry;
1038         bt_mutexlock(latch->modify);
1039         latch->pin |= CLOCK_bit;
1040         latch->pin++;
1041
1042         bt_releasemutex(latch->modify);
1043         bt_releasemutex(mgr->leaftable[hashidx].latch);
1044         return latch;
1045   }
1046
1047   //  find and reuse unpinned entry
1048
1049 trynext:
1050   entry = bt_availleaf (mgr);
1051   latch = mgr->leafsets + entry;
1052
1053   idx = latch->page_no % mgr->leafhash;
1054
1055   //  if latch is on a different hash chain
1056   //    unlink from the old page_no chain
1057
1058   if( latch->page_no )
1059    if( idx != hashidx ) {
1060
1061         //  skip over this entry if latch not available
1062
1063     if( !bt_mutextry (mgr->leaftable[idx].latch) ) {
1064           bt_releasemutex(latch->modify);
1065           goto trynext;
1066         }
1067
1068         if( latch->prev )
1069           mgr->leafsets[latch->prev].next = latch->next;
1070         else
1071           mgr->leaftable[idx].entry = latch->next;
1072
1073         if( latch->next )
1074           mgr->leafsets[latch->next].prev = latch->prev;
1075
1076     bt_releasemutex (mgr->leaftable[idx].latch);
1077    }
1078
1079   page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1080
1081   //  update permanent page area in btree from buffer pool
1082   //    no read-lock is required since page is not pinned.
1083
1084   if( latch->dirty )
1085         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
1086           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1087         else
1088           latch->dirty = 0;
1089
1090   if( bt_readpage (mgr, page, page_no, 1) )
1091         return mgr->line = __LINE__, NULL;
1092
1093   //  link page as head of hash table chain
1094   //  if this is a never before used entry,
1095   //  or it was previously on a different
1096   //  hash table chain. Otherwise, just
1097   //  leave it in its current hash table
1098   //  chain position.
1099
1100   if( !latch->page_no || hashidx != idx ) {
1101         if( latch->next = mgr->leaftable[hashidx].entry )
1102           mgr->leafsets[latch->next].prev = entry;
1103
1104         mgr->leaftable[hashidx].entry = entry;
1105     latch->prev = 0;
1106   }
1107
1108   //  fill in latch structure
1109
1110   latch->pin = CLOCK_bit | 1;
1111   latch->page_no = page_no;
1112   latch->leaf = 1;
1113
1114   bt_releasemutex (latch->modify);
1115   bt_releasemutex (mgr->leaftable[hashidx].latch);
1116   return latch;
1117 }
1118
1119 //      pin page in non-leaf buffer pool
1120 //      return with latchset pinned
1121
1122 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1123 {
1124 uint hashidx = page_no % mgr->latchhash;
1125 BtLatchSet *latch;
1126 uint entry, idx;
1127 BtPage page;
1128
1129   //  try to find our entry
1130
1131   bt_mutexlock(mgr->hashtable[hashidx].latch);
1132
1133   if( entry = mgr->hashtable[hashidx].entry ) do
1134   {
1135         latch = mgr->latchsets + entry;
1136
1137         if( page_no == latch->page_no )
1138                 break;
1139   } while( entry = latch->next );
1140
1141   //  found our entry: increment pin
1142
1143   if( entry ) {
1144         latch = mgr->latchsets + entry;
1145         bt_mutexlock(latch->modify);
1146         latch->pin |= CLOCK_bit;
1147         latch->pin++;
1148         bt_releasemutex(latch->modify);
1149         bt_releasemutex(mgr->hashtable[hashidx].latch);
1150         return latch;
1151   }
1152
1153   //  find and reuse unpinned entry
1154
1155 trynext:
1156   entry = bt_availnext (mgr);
1157   latch = mgr->latchsets + entry;
1158
1159   idx = latch->page_no % mgr->latchhash;
1160
1161   //  if latch is on a different hash chain
1162   //    unlink from the old page_no chain
1163
1164   if( latch->page_no )
1165    if( idx != hashidx ) {
1166
1167         //  skip over this entry if latch not available
1168
1169     if( !bt_mutextry (mgr->hashtable[idx].latch) ) {
1170           bt_releasemutex(latch->modify);
1171           goto trynext;
1172         }
1173
1174         if( latch->prev )
1175           mgr->latchsets[latch->prev].next = latch->next;
1176         else
1177           mgr->hashtable[idx].entry = latch->next;
1178
1179         if( latch->next )
1180           mgr->latchsets[latch->next].prev = latch->prev;
1181
1182     bt_releasemutex (mgr->hashtable[idx].latch);
1183    }
1184
1185   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1186
1187   //  update permanent page area in btree from buffer pool
1188   //    no read-lock is required since page is not pinned.
1189
1190   if( latch->dirty )
1191         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1192           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1193         else
1194           latch->dirty = 0;
1195
1196   if( bt_readpage (mgr, page, page_no, 0) )
1197         return mgr->line = __LINE__, NULL;
1198
1199   //  link page as head of hash table chain
1200   //  if this is a never before used entry,
1201   //  or it was previously on a different
1202   //  hash table chain. Otherwise, just
1203   //  leave it in its current hash table
1204   //  chain position.
1205
1206   if( !latch->page_no || hashidx != idx ) {
1207         if( latch->next = mgr->hashtable[hashidx].entry )
1208           mgr->latchsets[latch->next].prev = entry;
1209
1210         mgr->hashtable[hashidx].entry = entry;
1211     latch->prev = 0;
1212   }
1213
1214   //  fill in latch structure
1215
1216   latch->pin = CLOCK_bit | 1;
1217   latch->page_no = page_no;
1218   latch->leaf = 0;
1219
1220   bt_releasemutex (latch->modify);
1221   bt_releasemutex (mgr->hashtable[hashidx].latch);
1222   return latch;
1223 }
1224   
1225 void bt_mgrclose (BtMgr *mgr)
1226 {
1227 BtLatchSet *latch;
1228 BtLogHdr *eof;
1229 uint num = 0;
1230 BtPage page;
1231 uint slot;
1232
1233         //      flush previously written dirty pages
1234         //      and write recovery buffer to disk
1235
1236         fdatasync (mgr->idx);
1237
1238         if( mgr->redoend ) {
1239                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1240                 memset (eof, 0, sizeof(BtLogHdr));
1241         }
1242
1243         //      write remaining dirty pool pages to the btree
1244
1245         for( slot = 1; slot < mgr->latchtotal; slot++ ) {
1246                 page = (BtPage)(((uid)slot << mgr->page_bits) + mgr->pagepool);
1247                 latch = mgr->latchsets + slot;
1248
1249                 if( latch->dirty ) {
1250                         bt_writepage(mgr, page, latch->page_no, 0);
1251                         latch->dirty = 0, num++;
1252                 }
1253         }
1254
1255         //      write remaining dirty leaf pages to the btree
1256
1257         for( slot = 1; slot < mgr->leaftotal; slot++ ) {
1258                 page = (BtPage)(((uid)slot << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1259                 latch = mgr->leafsets + slot;
1260
1261                 if( latch->dirty ) {
1262                         bt_writepage(mgr, page, latch->page_no, 1);
1263                         latch->dirty = 0, num++;
1264                 }
1265         }
1266
1267         //      clear redo recovery buffer on disk.
1268
1269         if( mgr->pagezero->redopages ) {
1270                 eof = (BtLogHdr *)mgr->redobuff;
1271                 memset (eof, 0, sizeof(BtLogHdr));
1272                 eof->lsn = mgr->lsn;
1273                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1274                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1275         }
1276
1277         fprintf(stderr, "%d buffer pool pages flushed\n", num);
1278
1279 #ifdef unix
1280         while( mgr->segments )
1281                 munmap (mgr->pages[--mgr->segments], (uid)65536 << mgr->page_bits);
1282
1283         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1284         munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1285         munmap (mgr->pagezero, mgr->page_size);
1286 #else
1287         FlushViewOfFile(mgr->pagezero, 0);
1288         UnmapViewOfFile(mgr->pagezero);
1289         UnmapViewOfFile(mgr->pagepool);
1290         CloseHandle(mgr->halloc);
1291         CloseHandle(mgr->hpool);
1292 #endif
1293 #ifdef unix
1294         close (mgr->idx);
1295         free (mgr);
1296 #else
1297         VirtualFree (mgr->redobuff, 0, MEM_RELEASE);
1298         FlushFileBuffers(mgr->idx);
1299         CloseHandle(mgr->idx);
1300         GlobalFree (mgr);
1301 #endif
1302 }
1303
1304 //      close and release memory
1305
1306 void bt_close (BtDb *bt)
1307 {
1308         free (bt);
1309 }
1310
1311 //  open/create new btree buffer manager
1312
1313 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1314 //              size of page pool (e.g. 262144) and leaf pool
1315
1316 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1317 {
1318 uint lvl, attr, last, slot, idx;
1319 uint nlatchpage, latchhash;
1320 uint nleafpage, leafhash;
1321 unsigned char value[BtId];
1322 int flag, initit = 0;
1323 BtPageZero *pagezero;
1324 BtLatchSet *latch;
1325 off64_t size;
1326 uint amt[1];
1327 BtMgr* mgr;
1328 BtKey* key;
1329 BtVal *val;
1330
1331         // determine sanity of page size and buffer pool
1332
1333         if( leafxtra + pagebits > BT_maxbits )
1334                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1335
1336         if( pagebits < BT_minbits )
1337                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1338
1339 #ifdef unix
1340         mgr = calloc (1, sizeof(BtMgr));
1341
1342         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1343
1344         if( mgr->idx == -1 ) {
1345                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1346                 return free(mgr), NULL;
1347         }
1348 #else
1349         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1350         attr = FILE_ATTRIBUTE_NORMAL;
1351         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1352
1353         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1354                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1355                 return GlobalFree(mgr), NULL;
1356         }
1357 #endif
1358
1359 #ifdef unix
1360         pagezero = valloc (BT_maxpage);
1361         *amt = 0;
1362
1363         // read minimum page size to get root info
1364         //      to support raw disk partition files
1365         //      check if page_bits == 0 on the disk.
1366
1367         if( size = lseek (mgr->idx, 0L, 2) )
1368                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1369                         if( pagezero->page_bits ) {
1370                                 pagebits = pagezero->page_bits;
1371                                 leafxtra = pagezero->leaf_xtra;
1372                         } else
1373                                 initit = 1;
1374                 else
1375                         return free(mgr), free(pagezero), NULL;
1376         else
1377                 initit = 1;
1378 #else
1379         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1380         size = GetFileSize(mgr->idx, amt);
1381
1382         if( size || *amt ) {
1383                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1384                         return bt_mgrclose (mgr), NULL;
1385                 pagebits = pagezero->page_bits;
1386                 leafxtra = pagezero->leaf_xtra;
1387         } else
1388                 initit = 1;
1389 #endif
1390
1391         mgr->page_size = 1 << pagebits;
1392         mgr->page_bits = pagebits;
1393         mgr->leaf_xtra = leafxtra;
1394
1395         //  calculate number of latch hash table entries
1396
1397         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1398
1399         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1400         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1401         mgr->latchtotal = nodemax;
1402
1403         //  calculate number of leaf hash table entries
1404
1405         mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1406
1407         mgr->nleafpage += leafmax << leafxtra;          // size of the leaf pool in pages
1408         mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1409         mgr->leaftotal = leafmax;
1410
1411         mgr->redopage = LEAF_page + (1 << leafxtra);
1412
1413         if( !initit )
1414                 goto mgrlatch;
1415
1416         // initialize an empty b-tree with latch page, root page, page of leaves
1417         // and page(s) of latches and page pool cache
1418
1419         ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1420         memset (pagezero, 0, 1 << pagebits);
1421         pagezero->alloc->lvl = MIN_lvl - 1;
1422         pagezero->redopages = redopages;
1423         pagezero->page_bits = mgr->page_bits;
1424         pagezero->leaf_xtra = leafxtra;
1425
1426         bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1427         pagezero->upperpages = 1;
1428         pagezero->leafpages = 1;
1429
1430         //  initialize left-most LEAF page in
1431         //      alloc->left and count of active leaf pages.
1432
1433         bt_putid (pagezero->alloc->left, LEAF_page);
1434
1435         if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1436                 fprintf (stderr, "Unable to create btree page zero\n");
1437                 return bt_mgrclose (mgr), NULL;
1438         }
1439
1440         memset (pagezero, 0, 1 << pagebits);
1441
1442         for( lvl=MIN_lvl; lvl--; ) {
1443         BtSlot *node = slotptr(pagezero->alloc, 1);
1444                 node->off = mgr->page_size;
1445
1446                 if( !lvl )
1447                         node->off <<= mgr->leaf_xtra;
1448
1449                 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1450                 node->type = Librarian;
1451                 node++->dead = 1;
1452
1453                 node->off = node[-1].off;
1454                 key = keyptr(pagezero->alloc, 2);
1455                 key = keyptr(pagezero->alloc, 1);
1456                 key->len = 2;           // create stopper key
1457                 key->key[0] = 0xff;
1458                 key->key[1] = 0xff;
1459
1460                 bt_putid(value, MIN_lvl - lvl + 1);
1461                 val = valptr(pagezero->alloc, 1);
1462                 val->len = lvl ? BtId : 0;
1463                 memcpy (val->value, value, val->len);
1464
1465                 pagezero->alloc->min = node->off;
1466                 pagezero->alloc->lvl = lvl;
1467                 pagezero->alloc->cnt = 2;
1468                 pagezero->alloc->act = 1;
1469                 pagezero->alloc->page_no = MIN_lvl - lvl;
1470
1471                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1472                         fprintf (stderr, "Unable to create btree page\n");
1473                         return bt_mgrclose (mgr), NULL;
1474                 }
1475         }
1476
1477 mgrlatch:
1478 #ifdef unix
1479         free (pagezero);
1480 #else
1481         VirtualFree (pagezero, 0, MEM_RELEASE);
1482 #endif
1483
1484         // mlock the first segment of 64K pages
1485
1486         flag = PROT_READ | PROT_WRITE;
1487         mgr->pages[0] = mmap (0, (uid)65536 << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1488         mgr->segments = 1;
1489
1490         if( mgr->pages[0] == MAP_FAILED ) {
1491                 fprintf (stderr, "Unable to mmap first btree segment, error = %d\n", errno);
1492                 return bt_mgrclose (mgr), NULL;
1493         }
1494
1495         mgr->pagezero = (BtPageZero *)mgr->pages[0];
1496         mlock (mgr->pagezero, mgr->page_size);
1497
1498         mgr->redobuff = mgr->pages[0] + (mgr->redopage << mgr->page_bits);
1499         mlock (mgr->redobuff, mgr->pagezero->redopages << mgr->page_bits);
1500
1501         //      allocate pool buffers
1502
1503         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1504         if( mgr->pagepool == MAP_FAILED ) {
1505                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1506                 return bt_mgrclose (mgr), NULL;
1507         }
1508
1509         mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1510         if( mgr->leafpool == MAP_FAILED ) {
1511                 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1512                 return bt_mgrclose (mgr), NULL;
1513         }
1514
1515         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1516         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1517         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1518
1519         mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1520         mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1521         mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1522
1523         return mgr;
1524 }
1525
1526 //      open BTree access method
1527 //      based on buffer manager
1528
1529 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1530 {
1531 BtDb *bt = malloc (sizeof(*bt));
1532
1533         memset (bt, 0, sizeof(*bt));
1534         bt->main = main;
1535         bt->mgr = mgr;
1536 #ifdef unix
1537         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1538 #else
1539         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1540 #endif
1541         return bt;
1542 }
1543
1544 //  compare two keys, return > 0, = 0, or < 0
1545 //  =0: keys are same
1546 //  -1: key2 > key1
1547 //  +1: key2 < key1
1548 //  as the comparison value
1549
1550 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1551 {
1552 uint len1 = key1->len;
1553 int ans;
1554
1555         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1556                 return ans;
1557
1558         if( len1 > len2 )
1559                 return 1;
1560         if( len1 < len2 )
1561                 return -1;
1562
1563         return 0;
1564 }
1565
1566 // place write, read, or parent lock on requested page_no.
1567
1568 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1569 {
1570         switch( mode ) {
1571         case BtLockRead:
1572                 ReadLock (latch->readwr, thread_no);
1573                 break;
1574         case BtLockWrite:
1575                 WriteLock (latch->readwr, thread_no, line);
1576                 break;
1577         case BtLockAccess:
1578                 ReadLock (latch->access, thread_no);
1579                 break;
1580         case BtLockDelete:
1581                 WriteLock (latch->access, thread_no, line);
1582                 break;
1583         case BtLockParent:
1584                 WriteLock (latch->parent, thread_no, line);
1585                 break;
1586         case BtLockLink:
1587                 WriteLock (latch->link, thread_no, line);
1588                 break;
1589         }
1590 }
1591
1592 // remove write, read, or parent lock on requested page
1593
1594 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1595 {
1596         switch( mode ) {
1597         case BtLockRead:
1598                 ReadRelease (latch->readwr);
1599                 break;
1600         case BtLockWrite:
1601                 WriteRelease (latch->readwr);
1602                 break;
1603         case BtLockAccess:
1604                 ReadRelease (latch->access);
1605                 break;
1606         case BtLockDelete:
1607                 WriteRelease (latch->access);
1608                 break;
1609         case BtLockParent:
1610                 WriteRelease (latch->parent);
1611                 break;
1612         case BtLockLink:
1613                 WriteRelease (latch->link);
1614                 break;
1615         }
1616 }
1617
1618 //      allocate a new page
1619 //      return with page latched, but unlocked.
1620
1621 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1622 {
1623 uint page_size = mgr->page_size, page_xtra = 0;
1624 unsigned char *freechain;
1625 uid page_no;
1626
1627         if( contents->lvl ) {
1628                 freechain = mgr->pagezero->freechain;
1629                 mgr->pagezero->upperpages++;
1630         } else {
1631                 freechain = mgr->pagezero->leafchain;
1632                 mgr->pagezero->leafpages++;
1633                 page_xtra = mgr->leaf_xtra;
1634                 page_size <<= page_xtra;
1635         }
1636
1637         //      lock allocation page
1638
1639         bt_mutexlock(mgr->lock);
1640
1641         // use empty chain first
1642         // else allocate new page
1643
1644         if( page_no = bt_getid(freechain) ) {
1645                 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1646                         set->page = bt_mappage (mgr, set->latch);
1647                 else
1648                         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1649
1650                 bt_putid(freechain, bt_getid(set->page->right));
1651
1652                 //  the page is currently nopromote and this
1653                 //  will keep bt_promote out.
1654
1655                 //      contents will replace this bit
1656                 //  and pin will keep bt_promote out
1657
1658                 contents->page_no = page_no;
1659                 contents->nopromote = 0;
1660                 set->latch->dirty = 1;
1661
1662                 memcpy (set->page, contents, page_size);
1663
1664 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1665 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1666
1667                 bt_releasemutex(mgr->lock);
1668                 return 0;
1669         }
1670
1671         page_no = bt_getid(mgr->pagezero->alloc->right);
1672         bt_putid(mgr->pagezero->alloc->right, page_no+(1 << page_xtra));
1673
1674         // unlock allocation latch and
1675         //      extend file into new page.
1676
1677 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1678 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1679         bt_releasemutex(mgr->lock);
1680
1681         //      keep bt_promote out of this page
1682         contents->nopromote = 1;
1683         contents->page_no = page_no;
1684         if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1685                 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1686         if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1687                 set->page = bt_mappage (mgr, set->latch);
1688         else
1689                 return mgr->err_thread = thread_no, mgr->err;
1690
1691         // now pin will keep bt_promote out
1692
1693         set->page->nopromote = 0;
1694         set->latch->dirty = 1;
1695         return 0;
1696 }
1697
1698 //  find slot in page for given key at a given level
1699
1700 int bt_findslot (BtPage page, unsigned char *key, uint len)
1701 {
1702 uint diff, higher = page->cnt, low = 1, slot;
1703 uint good = 0;
1704
1705         //        make stopper key an infinite fence value
1706
1707         if( bt_getid (page->right) )
1708                 higher++;
1709         else
1710                 good++;
1711
1712         //      low is the lowest candidate.
1713         //  loop ends when they meet
1714
1715         //  higher is already
1716         //      tested as .ge. the passed key.
1717
1718         while( diff = higher - low ) {
1719                 slot = low + ( diff >> 1 );
1720                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1721                         low = slot + 1;
1722                 else
1723                         higher = slot, good++;
1724         }
1725
1726         //      return zero if key is on right link page
1727
1728         return good ? higher : 0;
1729 }
1730
1731 //  find and load page at given level for given key
1732 //      leave page rd or wr locked as requested
1733
1734 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1735 {
1736 uid page_no = ROOT_page, prevpage_no = 0;
1737 uint drill = 0xff, slot;
1738 BtLatchSet *prevlatch;
1739 uint mode, prevmode;
1740 BtPage prevpage;
1741 BtVal *val;
1742 BtKey *ptr;
1743
1744   //  start at root of btree and drill down
1745
1746   do {
1747         // determine lock mode of drill level
1748         mode = (drill == lvl) ? lock : BtLockRead; 
1749
1750         if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1751           set->page = bt_mappage (mgr, set->latch);
1752         else
1753           return 0;
1754
1755         // obtain access lock using lock chaining with Access mode
1756
1757         if( page_no > ROOT_page )
1758           bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1759
1760         //      release & unpin parent or left sibling page
1761
1762         if( prevpage_no ) {
1763           bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1764           bt_unpinlatch (prevlatch, thread_no, __LINE__);
1765           prevpage_no = 0;
1766         }
1767
1768         // obtain mode lock using lock coupling through AccessLock
1769
1770         bt_lockpage(mode, set->latch, thread_no, __LINE__);
1771
1772         // grab our fence key
1773
1774         ptr=keyptr(set->page,set->page->cnt);
1775
1776         if( set->page->free )
1777                 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1778
1779         if( page_no > ROOT_page )
1780           bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1781
1782         // re-read and re-lock root after determining actual level of root
1783
1784         if( set->page->lvl != drill) {
1785                 if( set->latch->page_no != ROOT_page )
1786                         return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1787                         
1788                 drill = set->page->lvl;
1789
1790                 if( lock != BtLockRead && drill == lvl ) {
1791                   bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1792                   bt_unpinlatch (set->latch, thread_no, __LINE__);
1793                   continue;
1794                 }
1795         }
1796
1797         prevpage_no = set->latch->page_no;
1798         prevlatch = set->latch;
1799         prevpage = set->page;
1800         prevmode = mode;
1801
1802         //  if requested key is beyond our fence,
1803         //      slide to the right
1804
1805         if( keycmp (ptr, key, len) < 0 )
1806           if( page_no = bt_getid(set->page->right) )
1807                 continue;
1808
1809         //  if page is part of a delete operation,
1810         //      slide to the left;
1811
1812         if( set->page->kill ) {
1813           bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1814           page_no = bt_getid(set->page->left);
1815           bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1816           continue;
1817         }
1818
1819         //  find key on page at this level
1820         //  and descend to requested level
1821
1822         if( slot = bt_findslot (set->page, key, len) ) {
1823           if( drill == lvl )
1824                 return slot;
1825
1826           // find next non-dead slot -- the fence key if nothing else
1827
1828           while( slotptr(set->page, slot)->dead )
1829                 if( slot++ < set->page->cnt )
1830                   continue;
1831                 else
1832                   return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1833
1834           val = valptr(set->page, slot);
1835
1836           if( val->len == BtId )
1837                 page_no = bt_getid(val->value);
1838           else
1839                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1840
1841           drill--;
1842           continue;
1843          }
1844
1845         //  slide right into next page
1846
1847         page_no = bt_getid(set->page->right);
1848
1849   } while( page_no );
1850
1851   // return error on end of right chain
1852
1853   mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1854   return 0;     // return error
1855 }
1856
1857 //      return page to free list
1858 //      page must be delete & write locked
1859 //      and have no keys pointing to it.
1860
1861 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1862 {
1863 unsigned char *freechain;
1864
1865         if( set->page->lvl ) {
1866                 freechain = mgr->pagezero->freechain;
1867                 mgr->pagezero->upperpages--;
1868         } else {
1869                 freechain = mgr->pagezero->leafchain;
1870                 mgr->pagezero->leafpages--;
1871         }
1872
1873         //      lock allocation page
1874
1875         bt_mutexlock (mgr->lock);
1876
1877         //      store chain
1878
1879         memcpy(set->page->right, freechain, BtId);
1880         bt_putid(freechain, set->latch->page_no);
1881         set->latch->dirty = 1;
1882         set->page->free = 1;
1883
1884 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1885 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1886
1887         // unlock released page
1888         // and unlock allocation page
1889
1890         bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1891         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1892         bt_unpinlatch (set->latch, thread_no, __LINE__);
1893         bt_releasemutex (mgr->lock);
1894 }
1895
1896 //      a fence key was deleted from a page
1897 //      push new fence value upwards
1898
1899 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1900 {
1901 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1902 unsigned char value[BtId];
1903 BtKey* ptr;
1904 uint idx;
1905
1906         //      remove the old fence value
1907
1908         ptr = keyptr(set->page, set->page->cnt);
1909         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1910         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1911         set->latch->dirty = 1;
1912
1913         //  cache new fence value
1914
1915         ptr = keyptr(set->page, set->page->cnt);
1916         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1917
1918         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1919         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1920
1921         //      insert new (now smaller) fence key
1922
1923         bt_putid (value, set->latch->page_no);
1924         ptr = (BtKey*)leftkey;
1925
1926         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1927           return mgr->err_thread = thread_no, mgr->err;
1928
1929         //      now delete old fence key
1930
1931         ptr = (BtKey*)rightkey;
1932
1933         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1934                 return mgr->err_thread = thread_no, mgr->err;
1935
1936         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1937         bt_unpinlatch(set->latch, thread_no, __LINE__);
1938         return 0;
1939 }
1940
1941 //      root has a single child
1942 //      collapse a level from the tree
1943
1944 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1945 {
1946 BtPageSet child[1];
1947 uid page_no;
1948 BtVal *val;
1949 uint idx;
1950
1951   // find the child entry and promote as new root contents
1952
1953   do {
1954         for( idx = 0; idx++ < root->page->cnt; )
1955           if( !slotptr(root->page, idx)->dead )
1956                 break;
1957
1958         val = valptr(root->page, idx);
1959
1960         if( val->len == BtId )
1961                 page_no = bt_getid (valptr(root->page, idx)->value);
1962         else
1963                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1964
1965         if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1966                 child->page = bt_mappage (mgr, child->latch);
1967         else
1968                 return mgr->err_thread = thread_no, mgr->err;
1969
1970         bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1971         bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1972
1973         memcpy (root->page, child->page, mgr->page_size);
1974         root->latch->dirty = 1;
1975
1976         bt_freepage (mgr, child, thread_no);
1977
1978   } while( root->page->lvl > 1 && root->page->act == 1 );
1979
1980   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1981   bt_unpinlatch (root->latch, thread_no, __LINE__);
1982   return 0;
1983 }
1984
1985 //  delete a page and manage key
1986 //  call with page writelocked
1987
1988 //      returns with page unpinned
1989 //      from the page pool.
1990
1991 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1992 {
1993 unsigned char lowerfence[BT_keyarray];
1994 uint page_size = mgr->page_size;
1995 BtPageSet right[1], temp[1];
1996 unsigned char value[BtId];
1997 uid page_no, right2;
1998 BtKey *ptr;
1999
2000         if( !lvl )
2001                 page_size <<= mgr->leaf_xtra;
2002
2003         //  cache copy of original fence key
2004         //      that is being deleted.
2005
2006         ptr = keyptr(set->page, set->page->cnt);
2007         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
2008
2009         //      obtain locks on right page
2010
2011         page_no = bt_getid(set->page->right);
2012
2013         if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
2014                 right->page = bt_mappage (mgr, right->latch);
2015         else
2016                 return 0;
2017
2018         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2019
2020         if( right->page->kill )
2021                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
2022
2023         // pull contents of right peer into our empty page
2024         //      preserving our left page number, and its right page number.
2025
2026         bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
2027         page_no = bt_getid(set->page->left);
2028         memcpy (set->page, right->page, page_size);
2029         bt_putid (set->page->left, page_no);
2030         bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
2031
2032         set->page->page_no = set->latch->page_no;
2033         set->latch->dirty = 1;
2034
2035         //  fix left link from far right page
2036
2037         if( right2 = bt_getid (right->page->right) ) {
2038           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2039                 temp->page = bt_mappage (mgr, temp->latch);
2040           else
2041                 return 0;
2042
2043       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2044           bt_putid (temp->page->left, set->latch->page_no);
2045           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2046           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2047         } else if( !lvl ) {     // our page is now rightmost leaf
2048           bt_mutexlock (mgr->lock);
2049           bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
2050           bt_releasemutex(mgr->lock);
2051         }
2052
2053         // mark right page deleted and release lock
2054
2055         right->latch->dirty = 1;
2056         right->page->kill = 1;
2057         bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2058
2059         // redirect higher key directly to our new node contents
2060
2061         ptr = keyptr(right->page, right->page->cnt);
2062         bt_putid (value, set->latch->page_no);
2063
2064         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
2065           return mgr->err;
2066
2067         //      delete our orignal fence key in parent
2068         //      and unlock our page.
2069
2070         ptr = (BtKey *)lowerfence;
2071
2072         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
2073           return mgr->err;
2074
2075         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2076         bt_unpinlatch (set->latch, thread_no, __LINE__);
2077
2078         //      obtain delete and write locks to right node
2079
2080         bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
2081         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2082         bt_freepage (mgr, right, thread_no);
2083         return 0;
2084 }
2085
2086 //  find and delete key on page by marking delete flag bit
2087 //  if page becomes empty, delete it from the btree
2088
2089 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2090 {
2091 uint slot, idx, found, fence;
2092 BtPageSet set[1];
2093 BtSlot *node;
2094 BtKey *ptr;
2095 BtVal *val;
2096
2097         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2098                 node = slotptr(set->page, slot);
2099                 ptr = keyptr(set->page, slot);
2100         } else
2101                 return mgr->err_thread = thread_no, mgr->err;
2102
2103         // if librarian slot, advance to real slot
2104
2105         if( node->type == Librarian ) {
2106                 ptr = keyptr(set->page, ++slot);
2107                 node = slotptr(set->page, slot);
2108         }
2109
2110         fence = slot == set->page->cnt;
2111
2112         // delete the key, ignore request if already dead
2113
2114         if( found = !keycmp (ptr, key, len) )
2115           if( found = node->dead == 0 ) {
2116                 val = valptr(set->page,slot);
2117                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2118                 set->page->act--;
2119                 node->dead = 1;
2120
2121                 // collapse empty slots beneath the fence
2122                 // on interiour nodes
2123
2124                 if( lvl )
2125                  while( idx = set->page->cnt - 1 )
2126                   if( slotptr(set->page, idx)->dead ) {
2127                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2128                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2129                   } else
2130                         break;
2131           }
2132
2133         if( !found )
2134                 return 0;
2135
2136         //      did we delete a fence key in an upper level?
2137
2138         if( lvl && set->page->act && fence )
2139           if( bt_fixfence (mgr, set, lvl, thread_no) )
2140                 return mgr->err_thread = thread_no, mgr->err;
2141           else
2142                 return 0;
2143
2144         //      do we need to collapse root?
2145
2146         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2147           if( bt_collapseroot (mgr, set, thread_no) )
2148                 return mgr->err_thread = thread_no, mgr->err;
2149           else
2150                 return 0;
2151
2152         //      delete empty page
2153
2154         if( !set->page->act )
2155           return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2156
2157         set->latch->dirty = 1;
2158         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2159         bt_unpinlatch (set->latch, thread_no, __LINE__);
2160         return 0;
2161 }
2162
2163 //      check page for space available,
2164 //      clean if necessary and return
2165 //      0 - page needs splitting
2166 //      >0  new slot value
2167
2168 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2169 {
2170 uint page_size = mgr->page_size;
2171 BtPage page = set->page, frame;
2172 uint cnt = 0, idx = 0;
2173 uint max = page->cnt;
2174 uint newslot = max;
2175 BtKey *key;
2176 BtVal *val;
2177
2178         if( !set->page->lvl )
2179                 page_size <<= mgr->leaf_xtra;
2180
2181         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2182                 return slot;
2183
2184         //      skip cleanup and proceed to split
2185         //      if there's not enough garbage
2186         //      to bother with.
2187
2188         if( page->garbage < page_size / 5 )
2189                 return 0;
2190
2191         frame = malloc (page_size);
2192         memcpy (frame, page, page_size);
2193
2194         // skip page info and set rest of page to zero
2195
2196         memset (page+1, 0, page_size - sizeof(*page));
2197         set->latch->dirty = 1;
2198
2199         page->min = page_size;
2200         page->garbage = 0;
2201         page->act = 0;
2202
2203         // clean up page first by
2204         // removing deleted keys
2205
2206         while( cnt++ < max ) {
2207                 if( cnt == slot )
2208                         newslot = idx + 2;
2209
2210                 if( cnt < max || frame->lvl )
2211                   if( slotptr(frame,cnt)->dead )
2212                         continue;
2213
2214                 // copy the value across
2215
2216                 val = valptr(frame, cnt);
2217                 page->min -= val->len + sizeof(BtVal);
2218                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2219
2220                 // copy the key across
2221
2222                 key = keyptr(frame, cnt);
2223                 page->min -= key->len + sizeof(BtKey);
2224                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2225
2226                 // make a librarian slot
2227
2228                 slotptr(page, ++idx)->off = page->min;
2229                 slotptr(page, idx)->type = Librarian;
2230                 slotptr(page, idx)->dead = 1;
2231
2232                 // set up the slot
2233
2234                 slotptr(page, ++idx)->off = page->min;
2235                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2236
2237                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2238                         page->act++;
2239         }
2240
2241         page->cnt = idx;
2242         free (frame);
2243
2244         //      see if page has enough space now, or does it need splitting?
2245
2246         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2247                 return newslot;
2248
2249         return 0;
2250 }
2251
2252 // split the root and raise the height of the btree
2253
2254 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2255 {  
2256 unsigned char leftkey[BT_keyarray];
2257 uint nxt = mgr->page_size;
2258 unsigned char value[BtId];
2259 BtPage frame, page;
2260 BtPageSet left[1];
2261 uid left_page_no;
2262 BtKey *ptr;
2263 BtVal *val;
2264
2265         frame = malloc (mgr->page_size);
2266         memcpy (frame, root->page, mgr->page_size);
2267
2268         //      save left page fence key for new root
2269
2270         ptr = keyptr(root->page, root->page->cnt);
2271         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2272
2273         //  Obtain an empty page to use, and copy the current
2274         //  root contents into it, e.g. lower keys
2275
2276         if( bt_newpage(mgr, left, frame, thread_no) )
2277                 return mgr->err_thread = thread_no, mgr->err;
2278
2279         left_page_no = left->latch->page_no;
2280         bt_unpinlatch (left->latch, thread_no, __LINE__);
2281         free (frame);
2282
2283         //      left link the pages together
2284
2285         page = bt_mappage (mgr, right);
2286         bt_putid (page->left, left_page_no);
2287
2288         // preserve the page info at the bottom
2289         // of higher keys and set rest to zero
2290
2291         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2292
2293         // insert stopper key at top of newroot page
2294         // and increase the root height
2295
2296         nxt -= BtId + sizeof(BtVal);
2297         bt_putid (value, right->page_no);
2298         val = (BtVal *)((unsigned char *)root->page + nxt);
2299         memcpy (val->value, value, BtId);
2300         val->len = BtId;
2301
2302         nxt -= 2 + sizeof(BtKey);
2303         slotptr(root->page, 2)->off = nxt;
2304         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2305         ptr->len = 2;
2306         ptr->key[0] = 0xff;
2307         ptr->key[1] = 0xff;
2308
2309         // insert lower keys page fence key on newroot page as first key
2310
2311         nxt -= BtId + sizeof(BtVal);
2312         bt_putid (value, left_page_no);
2313         val = (BtVal *)((unsigned char *)root->page + nxt);
2314         memcpy (val->value, value, BtId);
2315         val->len = BtId;
2316
2317         ptr = (BtKey *)leftkey;
2318         nxt -= ptr->len + sizeof(BtKey);
2319         slotptr(root->page, 1)->off = nxt;
2320         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2321         
2322         bt_putid(root->page->right, 0);
2323         root->page->min = nxt;          // reset lowest used offset and key count
2324         root->page->cnt = 2;
2325         root->page->act = 2;
2326         root->page->lvl++;
2327
2328         mgr->pagezero->alloc->lvl = root->page->lvl;
2329
2330         // release and unpin root pages
2331
2332         bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2333         bt_unpinlatch (root->latch, thread_no, __LINE__);
2334
2335         bt_unpinlatch (right, thread_no, __LINE__);
2336         return 0;
2337 }
2338
2339 //  split already locked full node
2340 //      leave it locked.
2341 //      return pool entry for new right
2342 //      page, pinned & unlocked
2343
2344 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2345 {
2346 uint page_size = mgr->page_size;
2347 BtPageSet right[1], temp[1];
2348 uint cnt = 0, idx = 0, max;
2349 uint lvl = set->page->lvl;
2350 BtPage frame;
2351 BtKey *key;
2352 BtVal *val;
2353 uid right2;
2354 uint entry;
2355 uint prev;
2356
2357         if( !set->page->lvl )
2358                 page_size <<= mgr->leaf_xtra;
2359
2360         //  split higher half of keys to frame
2361
2362         frame = malloc (page_size);
2363         memset (frame, 0, page_size);
2364         frame->min = page_size;
2365         max = set->page->cnt;
2366         cnt = max / 2;
2367         idx = 0;
2368
2369         while( cnt++ < max ) {
2370                 if( cnt < max || set->page->lvl )
2371                   if( slotptr(set->page, cnt)->dead )
2372                         continue;
2373
2374                 val = valptr(set->page, cnt);
2375                 frame->min -= val->len + sizeof(BtVal);
2376                 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2377
2378                 key = keyptr(set->page, cnt);
2379                 frame->min -= key->len + sizeof(BtKey);
2380                 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2381
2382                 //      add librarian slot
2383
2384                 slotptr(frame, ++idx)->off = frame->min;
2385                 slotptr(frame, idx)->type = Librarian;
2386                 slotptr(frame, idx)->dead = 1;
2387
2388                 //  add actual slot
2389
2390                 slotptr(frame, ++idx)->off = frame->min;
2391                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2392
2393                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2394                         frame->act++;
2395         }
2396
2397         frame->cnt = idx;
2398         frame->lvl = lvl;
2399
2400         // link right node
2401
2402         if( set->latch->page_no > ROOT_page ) {
2403                 right2 = bt_getid (set->page->right);
2404                 bt_putid (frame->right, right2);
2405
2406                 if( linkleft )
2407                         bt_putid (frame->left, set->latch->page_no);
2408         }
2409
2410         //      get new free page and write higher keys to it.
2411
2412         if( bt_newpage(mgr, right, frame, thread_no) )
2413                 return 0;
2414
2415         //      link far right's left pointer to new page
2416
2417         if( linkleft && set->latch->page_no > ROOT_page )
2418          if( right2 ) {
2419           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2420                 temp->page = bt_mappage (mgr, temp->latch);
2421           else
2422                 return 0;
2423
2424       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2425           bt_putid (temp->page->left, right->latch->page_no);
2426           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2427           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2428          } else if( !lvl ) {    // page is rightmost leaf
2429           bt_mutexlock (mgr->lock);
2430           bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2431           bt_releasemutex(mgr->lock);
2432          }
2433
2434         // process lower keys
2435
2436         memcpy (frame, set->page, page_size);
2437         memset (set->page+1, 0, page_size - sizeof(*set->page));
2438         set->latch->dirty = 1;
2439
2440         set->page->min = page_size;
2441         set->page->garbage = 0;
2442         set->page->act = 0;
2443         max /= 2;
2444         cnt = 0;
2445         idx = 0;
2446
2447         //  assemble page of smaller keys
2448
2449         while( cnt++ < max ) {
2450                 if( slotptr(frame, cnt)->dead )
2451                         continue;
2452                 val = valptr(frame, cnt);
2453                 set->page->min -= val->len + sizeof(BtVal);
2454                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2455
2456                 key = keyptr(frame, cnt);
2457                 set->page->min -= key->len + sizeof(BtKey);
2458                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2459
2460                 //      add librarian slot
2461
2462                 slotptr(set->page, ++idx)->off = set->page->min;
2463                 slotptr(set->page, idx)->type = Librarian;
2464                 slotptr(set->page, idx)->dead = 1;
2465
2466                 //      add actual slot
2467
2468                 slotptr(set->page, ++idx)->off = set->page->min;
2469                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2470                 set->page->act++;
2471         }
2472
2473         bt_putid(set->page->right, right->latch->page_no);
2474         set->page->cnt = idx;
2475         free(frame);
2476
2477         entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2478         return entry;
2479 }
2480
2481 //      fix keys for newly split page
2482 //      call with both pages pinned & locked
2483 //  return unlocked and unpinned
2484
2485 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2486 {
2487 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2488 unsigned char value[BtId];
2489 uint lvl = set->page->lvl;
2490 BtPageSet temp[1];
2491 BtPage page;
2492 BtKey *ptr;
2493 uid right2;
2494
2495         // if current page is the root page, split it
2496
2497         if( set->latch->page_no == ROOT_page )
2498                 return bt_splitroot (mgr, set, right, thread_no);
2499
2500         ptr = keyptr(set->page, set->page->cnt);
2501         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2502
2503         page = bt_mappage (mgr, right);
2504
2505         ptr = keyptr(page, page->cnt);
2506         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2507
2508         //      splice in far right page's left page_no
2509
2510         if( right2 = bt_getid (page->right) ) {
2511           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2512                 temp->page = bt_mappage (mgr, temp->latch);
2513           else
2514                 return 0;
2515
2516       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2517           bt_putid (temp->page->left, right->page_no);
2518           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2519           bt_unpinlatch (temp->latch, thread_no, __LINE__);
2520         } else if( !lvl ) {  // right page is far right page
2521           bt_mutexlock (mgr->lock);
2522           bt_putid (mgr->pagezero->alloc->left, right->page_no);
2523           bt_releasemutex(mgr->lock);
2524         }
2525         // insert new fences in their parent pages
2526
2527         bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2528
2529         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2530         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2531
2532         // insert new fence for reformulated left block of smaller keys
2533
2534         bt_putid (value, set->latch->page_no);
2535         ptr = (BtKey *)leftkey;
2536
2537         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2538                 return mgr->err_thread = thread_no, mgr->err;
2539
2540         // switch fence for right block of larger keys to new right page
2541
2542         bt_putid (value, right->page_no);
2543         ptr = (BtKey *)rightkey;
2544
2545         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2546                 return mgr->err_thread = thread_no, mgr->err;
2547
2548         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2549         bt_unpinlatch (set->latch, thread_no, __LINE__);
2550
2551         bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2552         bt_unpinlatch (right, thread_no, __LINE__);
2553         return 0;
2554 }
2555
2556 //      install new key and value onto page
2557 //      page must already be checked for
2558 //      adequate space
2559
2560 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2561 {
2562 uint idx, librarian;
2563 BtSlot *node;
2564 BtKey *ptr;
2565 BtVal *val;
2566 int rate;
2567
2568         //      if previous slot is a librarian slot, use it
2569
2570         if( slot > 1 )
2571           if( slotptr(set->page, slot-1)->type == Librarian )
2572                 slot--;
2573
2574         // copy value onto page
2575
2576         set->page->min -= vallen + sizeof(BtVal);
2577         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2578         memcpy (val->value, value, vallen);
2579         val->len = vallen;
2580
2581         // copy key onto page
2582
2583         set->page->min -= keylen + sizeof(BtKey);
2584         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2585         memcpy (ptr->key, key, keylen);
2586         ptr->len = keylen;
2587         
2588         //      find first empty slot at or above our insert slot
2589
2590         for( idx = slot; idx < set->page->cnt; idx++ )
2591           if( slotptr(set->page, idx)->dead )
2592                 break;
2593
2594         // now insert key into array before slot.
2595
2596         //      if we're going all the way to the top,
2597         //      add as many librarian slots as
2598         //      makes sense.
2599
2600         if( idx == set->page->cnt ) {
2601         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2602
2603                 librarian = ++idx - slot;
2604                 avail /= sizeof(BtSlot);
2605
2606                 if( avail < 0 )
2607                         avail = 0;
2608
2609                 if( librarian > avail )
2610                         librarian = avail;
2611
2612                 if( librarian ) {
2613                         rate = (idx - slot) / librarian;
2614                         set->page->cnt += librarian;
2615                         idx += librarian;
2616                 } else
2617                         rate = 0;
2618         } else
2619                 librarian = 0, rate = 0;
2620
2621         //  transfer slots and add librarian slots
2622
2623         while( idx > slot ) {
2624                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2625
2626                 //      add librarian slot per rate
2627
2628                 if( librarian )
2629                  if( (idx - slot)/2 <= librarian * rate ) {
2630                         node = slotptr(set->page, --idx);
2631                         node->off = node[1].off;
2632                         node->type = Librarian;
2633                         node->dead = 1;
2634                         librarian--;
2635                   }
2636
2637                 --idx;
2638         }
2639
2640         set->latch->dirty = 1;
2641         set->page->act++;
2642
2643         //      fill in new slot
2644
2645         node = slotptr(set->page, slot);
2646         node->off = set->page->min;
2647         node->type = type;
2648         node->dead = 0;
2649         return 0;
2650 }
2651
2652 //  Insert new key into the btree at given level.
2653 //      either add a new key or update/add an existing one
2654
2655 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2656 {
2657 uint slot, idx, len, entry;
2658 BtPageSet set[1];
2659 BtSlot *node;
2660 BtKey *ptr;
2661 BtVal *val;
2662
2663   while( 1 ) { // find the page and slot for the current key
2664         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2665                 node = slotptr(set->page, slot);
2666                 ptr = keyptr(set->page, slot);
2667         } else
2668                 return mgr->err;
2669
2670         // if librarian slot == found slot, advance to real slot
2671
2672         if( node->type == Librarian ) {
2673                 node = slotptr(set->page, ++slot);
2674                 ptr = keyptr(set->page, slot);
2675         }
2676
2677         //  if inserting a duplicate key or unique
2678         //      key that doesn't exist on the page,
2679         //      check for adequate space on the page
2680         //      and insert the new key before slot.
2681
2682         switch( type ) {
2683         case Unique:
2684         case Duplicate:
2685           if( !keycmp (ptr, key, keylen) )
2686                 break;
2687
2688           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2689             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2690                   return mgr->err;
2691                 else
2692                   goto insxit;
2693
2694           if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2695             if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2696                   continue;
2697
2698           return mgr->err_thread = thread_no, mgr->err;
2699
2700         case Update:
2701           if( keycmp (ptr, key, keylen) )
2702                 goto insxit;
2703
2704           break;
2705         }
2706
2707         // if key already exists, update value and return
2708
2709         val = valptr(set->page, slot);
2710
2711         if( val->len >= vallen ) {
2712           if( node->dead )
2713                 set->page->act++;
2714           node->type = type;
2715           node->dead = 0;
2716
2717           set->page->garbage += val->len - vallen;
2718           set->latch->dirty = 1;
2719           val->len = vallen;
2720           memcpy (val->value, value, vallen);
2721           goto insxit;
2722         }
2723
2724         //  new update value doesn't fit in existing value area
2725         //      make sure page has room
2726
2727         if( !node->dead )
2728           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2729         else
2730           set->page->act++;
2731
2732         node->type = type;
2733         node->dead = 0;
2734
2735         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2736           break;
2737
2738         if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2739           if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2740                 continue;
2741
2742         return mgr->err_thread = thread_no, mgr->err;
2743   }
2744
2745   //  copy key and value onto page and update slot
2746
2747   set->page->min -= vallen + sizeof(BtVal);
2748   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2749   memcpy (val->value, value, vallen);
2750   val->len = vallen;
2751
2752   set->latch->dirty = 1;
2753   set->page->min -= keylen + sizeof(BtKey);
2754   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2755   memcpy (ptr->key, key, keylen);
2756   ptr->len = keylen;
2757         
2758   slotptr(set->page,slot)->off = set->page->min;
2759
2760 insxit:
2761   bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2762   bt_unpinlatch (set->latch, thread_no, __LINE__);
2763   return 0;
2764 }
2765
2766 //      determine actual page where key is located
2767 //  return slot number
2768
2769 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2770 {
2771 BtKey *key = keyptr(source,src), *ptr;
2772 uint slot = locks[src].slot;
2773 uint entry;
2774
2775         if( locks[src].reuse )
2776           entry = locks[src-1].entry;
2777         else
2778           entry = locks[src].entry;
2779
2780         if( slot ) {
2781                 set->latch = mgr->leafsets + entry;
2782                 set->page = bt_mappage (mgr, set->latch);
2783                 return slot;
2784         }
2785
2786         //      find where our key is located 
2787         //      on current page or pages split on
2788         //      same page txn operations.
2789
2790         do {
2791                 set->latch = mgr->leafsets + entry;
2792                 set->page = bt_mappage (mgr, set->latch);
2793
2794                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2795                   if( slotptr(set->page, slot)->type == Librarian )
2796                         slot++;
2797                   if( locks[src].reuse )
2798                         locks[src].entry = entry;
2799                   return slot;
2800                 }
2801         } while( entry = set->latch->split );
2802
2803         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2804         return 0;
2805 }
2806
2807 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2808 {
2809 BtKey *key = keyptr(source, src);
2810 BtVal *val = valptr(source, src);
2811 BtLatchSet *latch;
2812 BtPageSet set[1];
2813 uint entry, slot;
2814
2815   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2816         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2817           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2818                 return mgr->err_thread = thread_no, mgr->err;
2819
2820           set->page->lsn = lsn;
2821           return 0;
2822         }
2823
2824         //  split page
2825
2826         if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2827           latch = mgr->leafsets + entry;
2828         else
2829           return mgr->err;
2830
2831         //      splice right page into split chain
2832         //      and WriteLock it
2833
2834         bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2835         latch->split = set->latch->split;
2836         set->latch->split = entry;
2837
2838         // clear slot number for atomic page
2839
2840         locks[src].slot = 0;
2841   }
2842
2843   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2844 }
2845
2846 //      perform delete from smaller btree
2847 //  insert a delete slot if not found there
2848
2849 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2850 {
2851 BtKey *key = keyptr(source, src);
2852 BtPageSet set[1];
2853 uint idx, slot;
2854 BtSlot *node;
2855 BtKey *ptr;
2856 BtVal *val;
2857
2858         if( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2859           node = slotptr(set->page, slot);
2860           ptr = keyptr(set->page, slot);
2861           val = valptr(set->page, slot);
2862         } else
2863           return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2864
2865         //  if slot is not found, insert a delete slot
2866
2867         if( keycmp (ptr, key->key, key->len) )
2868           if( bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete) )
2869                 return mgr->err;
2870
2871         //      if node is already dead,
2872         //      ignore the request.
2873
2874         if( node->dead )
2875                 return 0;
2876
2877         set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2878         set->latch->dirty = 1;
2879         set->page->lsn = lsn;
2880         set->page->act--;
2881
2882         node->dead = 0;
2883         __sync_fetch_and_add(&mgr->found, 1);
2884         return 0;
2885 }
2886
2887 //      release master's splits from right to left
2888
2889 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2890 {
2891 BtLatchSet *latch = mgr->leafsets + entry;
2892
2893         if( latch->split )
2894                 bt_atomicrelease (mgr, latch->split, thread_no);
2895
2896         latch->split = 0;
2897         bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2898         bt_unpinlatch(latch, thread_no, __LINE__);
2899 }
2900
2901 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2902 {
2903 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2904 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2905
2906         return keycmp (key1, key2->key, key2->len);
2907 }
2908 //      atomic modification of a batch of keys.
2909
2910 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2911 {
2912 uint src, idx, slot, samepage, entry, que = 0;
2913 BtKey *key, *ptr, *key2;
2914 int result = 0;
2915 BtSlot temp[1];
2916 logseqno lsn;
2917 int type;
2918
2919   // stable sort the list of keys into order to
2920   //    prevent deadlocks between threads.
2921 /*
2922   for( src = 1; src++ < source->cnt; ) {
2923         *temp = *slotptr(source,src);
2924         key = keyptr (source,src);
2925
2926         for( idx = src; --idx; ) {
2927           key2 = keyptr (source,idx);
2928           if( keycmp (key, key2->key, key2->len) < 0 ) {
2929                 *slotptr(source,idx+1) = *slotptr(source,idx);
2930                 *slotptr(source,idx) = *temp;
2931           } else
2932                 break;
2933         }
2934   }
2935 */
2936         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2937   //  add entries to redo log
2938
2939   if( bt->mgr->pagezero->redopages )
2940         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2941   else
2942         lsn = 0;
2943
2944   //  perform the individual actions in the transaction
2945
2946   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2947         return bt->mgr->err;
2948
2949   // if number of active pages
2950   // is greater than the buffer pool
2951   // promote page into larger btree
2952
2953   if( bt->main )
2954    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2955         if( bt_promote (bt) )
2956           return bt->mgr->err;
2957
2958   // return success
2959
2960   return 0;
2961 }
2962
2963 //      execute the source list of inserts/deletes
2964
2965 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2966 {
2967 uint slot, src, idx, samepage, entry;
2968 BtPageSet set[1], prev[1];
2969 unsigned char value[BtId];
2970 BtLatchSet *latch;
2971 uid right_page_no;
2972 AtomicTxn *locks;
2973 BtKey *key, *ptr;
2974 BtPage page;
2975 BtVal *val;
2976
2977   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2978
2979   // Load the leaf page for each key
2980   // group same page references with reuse bit
2981
2982   for( src = 0; src++ < source->cnt; ) {
2983         key = keyptr(source, src);
2984
2985         // first determine if this modification falls
2986         // on the same page as the previous modification
2987         //      note that the far right leaf page is a special case
2988
2989         if( samepage = src > 1 )
2990           samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
2991
2992         if( !samepage )
2993           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2994                 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
2995           else
2996                 return mgr->err;
2997         else
2998           slot = 0;
2999
3000         if( slot )
3001          if( slotptr(set->page, slot)->type == Librarian )
3002           slot++;
3003
3004         entry = set->latch - mgr->leafsets;
3005         locks[src].reuse = samepage;
3006         locks[src].entry = entry;
3007         locks[src].slot = slot;
3008
3009         //      capture current lsn for master page
3010
3011         locks[src].reqlsn = set->page->lsn;
3012   }
3013
3014   // insert or delete each key
3015   // process any splits or merges
3016   // run through txn list backwards
3017
3018   samepage = source->cnt + 1;
3019
3020   for( src = source->cnt; src; src-- ) {
3021         if( locks[src].reuse )
3022           continue;
3023
3024         //  perform the txn operations
3025         //      from smaller to larger on
3026         //  the same page
3027
3028         for( idx = src; idx < samepage; idx++ )
3029          switch( slotptr(source,idx)->type ) {
3030          case Delete:
3031           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
3032                 return mgr->err;
3033           break;
3034
3035         case Duplicate:
3036         case Unique:
3037           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
3038                 return mgr->err;
3039           break;
3040
3041         default:
3042           bt_atomicpage (mgr, source, locks, idx, set);
3043           break;
3044         }
3045
3046         //      after the same page operations have finished,
3047         //  process master page for splits or deletion.
3048
3049         latch = prev->latch = mgr->leafsets + locks[src].entry;
3050         prev->page = bt_mappage (mgr, prev->latch);
3051         samepage = src;
3052
3053         //  pick-up all splits from master page
3054         //      each one is already pinned & WriteLocked.
3055
3056         while( entry = prev->latch->split ) {
3057           set->latch = mgr->leafsets + entry;
3058           set->page = bt_mappage (mgr, set->latch);
3059
3060           // delete empty master page by undoing its split
3061           //  (this is potentially another empty page)
3062           //  note that there are no pointers to it yet
3063
3064           if( !prev->page->act ) {
3065                 memcpy (set->page->left, prev->page->left, BtId);
3066                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3067                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3068                 prev->latch->split = set->latch->split;
3069                 prev->latch->dirty = 1;
3070                 bt_freepage (mgr, set, thread_no);
3071                 continue;
3072           }
3073
3074           // remove empty split page from the split chain
3075           // and return it to the free list. No other
3076           // thread has its page number yet.
3077
3078           if( !set->page->act ) {
3079                 memcpy (prev->page->right, set->page->right, BtId);
3080                 prev->latch->split = set->latch->split;
3081
3082                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3083                 bt_freepage (mgr, set, thread_no);
3084                 continue;
3085           }
3086
3087           //  update prev's fence key
3088
3089           ptr = keyptr(prev->page,prev->page->cnt);
3090           bt_putid (value, prev->latch->page_no);
3091
3092           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3093                 return mgr->err;
3094
3095           // splice in the left link into the split page
3096
3097           bt_putid (set->page->left, prev->latch->page_no);
3098
3099           if( lsm )
3100                 bt_syncpage (mgr, prev->page, prev->latch);
3101
3102           *prev = *set;
3103         }
3104
3105         //  update left pointer in next right page from last split page
3106         //      (if all splits were reversed or none occurred, latch->split == 0)
3107
3108         if( latch->split ) {
3109           //  fix left pointer in master's original (now split)
3110           //  far right sibling or set rightmost page in page zero
3111
3112           if( right_page_no = bt_getid (prev->page->right) ) {
3113                 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3114                   set->page = bt_mappage (mgr, set->latch);
3115                 else
3116                   return mgr->err;
3117
3118             bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3119             bt_putid (set->page->left, prev->latch->page_no);
3120                 set->latch->dirty = 1;
3121
3122             bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3123                 bt_unpinlatch (set->latch, thread_no, __LINE__);
3124           } else {      // prev is rightmost page
3125             bt_mutexlock (mgr->lock);
3126                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3127             bt_releasemutex(mgr->lock);
3128           }
3129
3130           //  switch the original fence key from the
3131           //  master page to the last split page.
3132
3133           ptr = keyptr(prev->page,prev->page->cnt);
3134           bt_putid (value, prev->latch->page_no);
3135
3136           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3137                 return mgr->err;
3138
3139           if( lsm )
3140                 bt_syncpage (mgr, prev->page, prev->latch);
3141
3142           //  unlock and unpin the split pages
3143
3144           bt_atomicrelease (mgr, latch->split, thread_no);
3145
3146           //  unlock and unpin the master page
3147
3148           latch->split = 0;
3149           bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3150           bt_unpinlatch(latch, thread_no, __LINE__);
3151           continue;
3152         }
3153
3154         //      since there are no splits remaining, we're
3155         //  finished if master page occupied
3156
3157         if( prev->page->act ) {
3158           bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3159
3160           if( lsm )
3161                 bt_syncpage (mgr, prev->page, prev->latch);
3162
3163           bt_unpinlatch(prev->latch, thread_no, __LINE__);
3164           continue;
3165         }
3166
3167         // any and all splits were reversed, and the
3168         // master page located in prev is empty, delete it
3169
3170         if( bt_deletepage (mgr, prev, thread_no, 0) )
3171                 return mgr->err;
3172   }
3173
3174   free (locks);
3175   return 0;
3176 }
3177
3178 //  pick & promote a page into the larger btree
3179
3180 BTERR bt_promote (BtDb *bt)
3181 {
3182 uint entry, slot, idx;
3183 BtPageSet set[1];
3184 BtSlot *node;
3185 BtKey *ptr;
3186 BtVal *val;
3187
3188   while( 1 ) {
3189 #ifdef unix
3190         entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3191 #else
3192         entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3193 #endif
3194         entry %= bt->mgr->leaftotal;
3195
3196         if( !entry )
3197                 continue;
3198
3199         set->latch = bt->mgr->leafsets + entry;
3200
3201         //  skip this entry if it has never been used
3202
3203         if( !set->latch->page_no )
3204           continue;
3205
3206         if( !bt_mutextry(set->latch->modify) )
3207                 continue;
3208
3209         //  skip this entry if it is pinned
3210
3211         if( set->latch->pin & ~CLOCK_bit ) {
3212           bt_releasemutex(set->latch->modify);
3213           continue;
3214         }
3215
3216         set->page = bt_mappage (bt->mgr, set->latch);
3217
3218         // entry has no right sibling
3219
3220         if( !bt_getid (set->page->right) ) {
3221           bt_releasemutex(set->latch->modify);
3222           continue;
3223         }
3224
3225         // entry interiour node or being killed or constructed
3226
3227         if( set->page->lvl || set->page->nopromote || set->page->kill ) {
3228           bt_releasemutex(set->latch->modify);
3229           continue;
3230         }
3231
3232         //  pin the page for our access
3233         //      and leave it locked for the
3234         //      duration of the promotion.
3235
3236         set->latch->pin++;
3237         bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3238         bt_releasemutex(set->latch->modify);
3239
3240         // transfer slots in our selected page to the main btree
3241 if( !(entry % 100) )
3242 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3243
3244         if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3245                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3246                 return bt->main->err;
3247         }
3248
3249         //  now delete the page
3250
3251         if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3252                 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3253
3254         return bt->mgr->err;
3255   }
3256 }
3257
3258 //      find unique key == given key, or first duplicate key in
3259 //      leaf level and return number of value bytes
3260 //      or (-1) if not found.
3261
3262 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3263 {
3264 int ret = -1, type;
3265 BtPageSet set[1];
3266 BtSlot *node;
3267 BtKey *ptr;
3268 BtVal *val;
3269 uint slot;
3270
3271  for( type = 0; type < 2; type++ )
3272   if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3273         node = slotptr(set->page, slot);
3274
3275         //      skip librarian slot place holder
3276
3277         if( node->type == Librarian )
3278           node = slotptr(set->page, ++slot);
3279
3280         ptr = keyptr(set->page, slot);
3281
3282         //      not there if we reach the stopper key
3283         //  or the key doesn't match what's on the page.
3284
3285         if( slot == set->page->cnt )
3286           if( !bt_getid (set->page->right) ) {
3287                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3288                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3289                 continue;
3290         }
3291
3292         if( keycmp (ptr, key, keylen) ) {
3293                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3294                 bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3295                 continue;
3296         }
3297
3298         // key matches, return >= 0 value bytes copied
3299         //      or -1 if not there.
3300
3301         if( node->type == Delete || node->dead ) {
3302                 ret = -1;
3303                 goto findxit;
3304         }
3305
3306         val = valptr (set->page,slot);
3307
3308         if( valmax > val->len )
3309                 valmax = val->len;
3310
3311         memcpy (value, val->value, valmax);
3312         ret = valmax;
3313         goto findxit;
3314   }
3315
3316   ret = -1;
3317
3318 findxit:
3319   if( type < 2 ) {
3320         bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3321         bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3322   }
3323   return ret;
3324 }
3325
3326 //      set cursor to highest slot on left-most page
3327
3328 BTERR bt_lastkey (BtDb *bt)
3329 {
3330 uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3331 uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
3332
3333         if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3334                 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3335         else
3336                 return bt->mgr->err;
3337
3338     bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3339         bt->cacheslot = bt->cacheset->page->cnt;
3340
3341         if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3342                 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3343         else
3344                 return bt->main->err;
3345
3346     bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3347         bt->mainslot = bt->mainset->page->cnt;
3348         bt->phase = 2;
3349         return 0;
3350 }
3351
3352 //      return previous slot on cursor page
3353
3354 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3355 {
3356 uid next, us = set->latch->page_no;
3357
3358   while( 1 ) {
3359         while( --slot )
3360           if( slotptr(set->page, slot)->dead )
3361                 continue;
3362           else
3363                 return slot;
3364
3365         next = bt_getid(set->page->left);
3366
3367         if( !next )
3368                 return 0;
3369
3370         do {
3371           bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3372           bt_unpinlatch (set->latch, thread_no, __LINE__);
3373
3374           if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3375             set->page = bt_mappage (mgr, set->latch);
3376           else
3377             return 0;
3378
3379       bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3380           next = bt_getid (set->page->right);
3381
3382         } while( next != us );
3383
3384     slot = set->page->cnt + 1;
3385   }
3386 }
3387
3388 //  advance to previous key
3389
3390 BTERR bt_prevkey (BtDb *bt)
3391 {
3392 int cmp;
3393
3394   // first advance last key(s) one previous slot
3395
3396   while( 1 ) {
3397         switch( bt->phase ) {
3398         case 0:
3399           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3400           break;
3401         case 1:
3402           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3403           break;
3404         case 2:
3405           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3406           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3407         break;
3408         }
3409
3410   // return next key
3411
3412         if( bt->cacheslot ) {
3413           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3414           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3415           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3416         }
3417
3418         if( bt->mainslot ) {
3419           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3420           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3421           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3422         }
3423
3424         if( bt->mainslot && bt->cacheslot )
3425           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3426         else if( bt->cacheslot )
3427           cmp = 1;
3428         else if( bt->mainslot )
3429           cmp = -1;
3430         else
3431           return 0;
3432
3433         //  cache key is larger
3434
3435         if( cmp > 0 ) {
3436           bt->phase = 0;
3437           if( bt->cachenode->type == Delete )
3438                 continue;
3439           return bt->cacheslot;
3440         }
3441
3442         //  main key is larger
3443
3444         if( cmp < 0 ) {
3445           bt->phase = 1;
3446           return bt->mainslot;
3447         }
3448
3449         //      keys are equal
3450
3451         bt->phase = 2;
3452
3453         if( bt->cachenode->type == Delete )
3454                 continue;
3455
3456         return bt->cacheslot;
3457   }
3458 }
3459
3460 //      advance to next slot in cache or main btree
3461 //      return 0 for EOF/error
3462
3463 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3464 {
3465 BtPage page;
3466 uid page_no;
3467
3468   while( 1 ) {
3469         while( slot++ < set->page->cnt )
3470           if( slotptr(set->page, slot)->dead )
3471                 continue;
3472           else if( slot < set->page->cnt || bt_getid (set->page->right) )
3473                 return slot;
3474           else
3475                 return 0;
3476
3477         bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3478         bt_unpinlatch (set->latch, thread_no, __LINE__);
3479
3480         if( page_no = bt_getid(set->page->right) )
3481           if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3482                 set->page = bt_mappage (mgr, set->latch);
3483           else
3484                 return 0;
3485         else
3486           return 0; // EOF
3487
3488         // obtain access lock using lock chaining with Access mode
3489
3490         bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3491         bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3492         bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3493         slot = 0;
3494   }
3495 }
3496
3497 //  advance to next key
3498
3499 BTERR bt_nextkey (BtDb *bt)
3500 {
3501 int cmp;
3502
3503   // first advance last key(s) one next slot
3504
3505   while( 1 ) {
3506         switch( bt->phase ) {
3507         case 0:
3508           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3509           break;
3510         case 1:
3511           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3512           break;
3513         case 2:
3514           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3515           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3516         break;
3517         }
3518
3519   // return next key
3520
3521         if( bt->cacheslot ) {
3522           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3523           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3524           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3525         }
3526
3527         if( bt->mainslot ) {
3528           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3529           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3530           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3531         }
3532
3533         if( bt->mainslot && bt->cacheslot )
3534           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3535         else if( bt->mainslot )
3536           cmp = 1;
3537         else if( bt->cacheslot )
3538           cmp = -1;
3539         else
3540           return 0;
3541
3542         //  main key is larger
3543
3544         if( cmp < 0 ) {
3545           bt->phase = 0;
3546           if( bt->cachenode->type == Delete )
3547                 continue;
3548           return bt->cacheslot;
3549         }
3550
3551         //  cache key is larger
3552
3553         if( cmp > 0 ) {
3554           bt->phase = 1;
3555           return bt->mainslot;
3556         }
3557
3558         //      keys are equal
3559
3560         bt->phase = 2;
3561
3562         if( bt->cachenode->type == Delete )
3563                 continue;
3564
3565         return bt->cacheslot;
3566   }
3567 }
3568
3569 //  start sweep of keys
3570
3571 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3572 {
3573 BtPageSet set[1];
3574 uint slot;
3575
3576         // cache btree page
3577
3578         if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3579                 bt->cacheslot = slot - 1;
3580         else
3581           return bt->mgr->err;
3582
3583         // main btree page
3584
3585         if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3586                 bt->mainslot = slot - 1;
3587         else
3588           return bt->mgr->err;
3589
3590         bt->phase = 2;
3591         return 0;
3592 }
3593
3594 #ifdef STANDALONE
3595
3596 #ifndef unix
3597 double getCpuTime(int type)
3598 {
3599 FILETIME crtime[1];
3600 FILETIME xittime[1];
3601 FILETIME systime[1];
3602 FILETIME usrtime[1];
3603 SYSTEMTIME timeconv[1];
3604 double ans = 0;
3605
3606         memset (timeconv, 0, sizeof(SYSTEMTIME));
3607
3608         switch( type ) {
3609         case 0:
3610                 GetSystemTimeAsFileTime (xittime);
3611                 FileTimeToSystemTime (xittime, timeconv);
3612                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3613                 break;
3614         case 1:
3615                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3616                 FileTimeToSystemTime (usrtime, timeconv);
3617                 break;
3618         case 2:
3619                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3620                 FileTimeToSystemTime (systime, timeconv);
3621                 break;
3622         }
3623
3624         ans += (double)timeconv->wHour * 3600;
3625         ans += (double)timeconv->wMinute * 60;
3626         ans += (double)timeconv->wSecond;
3627         ans += (double)timeconv->wMilliseconds / 1000;
3628         return ans;
3629 }
3630 #else
3631 #include <time.h>
3632 #include <sys/resource.h>
3633
3634 double getCpuTime(int type)
3635 {
3636 struct rusage used[1];
3637 struct timeval tv[1];
3638
3639         switch( type ) {
3640         case 0:
3641                 gettimeofday(tv, NULL);
3642                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3643
3644         case 1:
3645                 getrusage(RUSAGE_SELF, used);
3646                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3647
3648         case 2:
3649                 getrusage(RUSAGE_SELF, used);
3650                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3651         }
3652
3653         return 0;
3654 }
3655 #endif
3656
3657 void bt_poolaudit (BtMgr *mgr, char *type)
3658 {
3659 BtLatchSet *latch, test[1];
3660 uint entry;
3661
3662         memset (test, 0, sizeof(test));
3663
3664         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3665                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3666
3667         if( memcmp (test, mgr->leafsets, sizeof(test)) )
3668                 fprintf(stderr, "%s leafset zero overwritten\n", type);
3669
3670         for( entry = 0; ++entry < mgr->latchtotal; ) {
3671                 latch = mgr->latchsets + entry;
3672
3673                 if( *latch->modify->value )
3674                         fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3675
3676                 if( latch->pin & ~CLOCK_bit )
3677                         fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3678         }
3679
3680         for( entry = 0; ++entry < mgr->leaftotal; ) {
3681                 latch = mgr->leafsets + entry;
3682
3683                 if( *latch->modify->value )
3684                         fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3685
3686                 if( latch->pin & ~CLOCK_bit )
3687                         fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3688         }
3689 }
3690
3691 typedef struct {
3692         char idx;
3693         char *type;
3694         char *infile;
3695         BtMgr *main;
3696         BtMgr *mgr;
3697         int num;
3698 } ThreadArg;
3699
3700 //  standalone program to index file of keys
3701 //  then list them onto std-out
3702
3703 #ifdef unix
3704 void *index_file (void *arg)
3705 #else
3706 uint __stdcall index_file (void *arg)
3707 #endif
3708 {
3709 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3710 uid next, page_no = LEAF_page;  // start on first page of leaves
3711 int ch, len = 0, slot, type = 0;
3712 unsigned char key[BT_maxkey];
3713 unsigned char buff[65536];
3714 uint nxt = sizeof(buff);
3715 ThreadArg *args = arg;
3716 BtPage page, frame;
3717 BtPageSet set[1];
3718 int vallen;
3719 BtKey *ptr;
3720 BtVal *val;
3721 BtDb *bt;
3722 FILE *in;
3723
3724         bt = bt_open (args->mgr, args->main);
3725         page = (BtPage)buff;
3726
3727         if( args->idx < strlen (args->type) )
3728                 ch = args->type[args->idx];
3729         else
3730                 ch = args->type[strlen(args->type) - 1];
3731
3732         switch(ch | 0x20)
3733         {
3734         case 'd':
3735                 type = Delete;
3736
3737         case 'p':
3738                 if( !type )
3739                         type = Unique;
3740
3741                 if( args->num )
3742                  if( type == Delete )
3743                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3744                  else
3745                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3746                 else
3747                  if( type == Delete )
3748                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3749                  else
3750                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3751
3752                 if( in = fopen (args->infile, "rb") )
3753                   while( ch = getc(in), ch != EOF )
3754                         if( ch == '\n' )
3755                         {
3756                           line++;
3757
3758                           if( !args->num ) {
3759                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3760                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3761                             len = 0;
3762                                 continue;
3763                           }
3764
3765                           nxt -= len - 10;
3766                           memcpy (buff + nxt, key + 10, len - 10);
3767                           nxt -= 1;
3768                           buff[nxt] = len - 10;
3769                           nxt -= 10;
3770                           memcpy (buff + nxt, key, 10);
3771                           nxt -= 1;
3772                           buff[nxt] = 10;
3773                           slotptr(page,++cnt)->off  = nxt;
3774                           slotptr(page,cnt)->type = type;
3775                           len = 0;
3776
3777                           if( cnt < args->num )
3778                                 continue;
3779
3780                           page->cnt = cnt;
3781                           page->act = cnt;
3782                           page->min = nxt;
3783
3784                           if( bt_atomictxn (bt, page) )
3785                                 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3786                           nxt = sizeof(buff);
3787                           cnt = 0;
3788
3789                         }
3790                         else if( len < BT_maxkey )
3791                                 key[len++] = ch;
3792                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3793                 break;
3794
3795         case 'w':
3796                 fprintf(stderr, "started indexing for %s\n", args->infile);
3797                 if( in = fopen (args->infile, "r") )
3798                   while( ch = getc(in), ch != EOF )
3799                         if( ch == '\n' )
3800                         {
3801                           line++;
3802
3803                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3804                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3805                           len = 0;
3806                         }
3807                         else if( len < BT_maxkey )
3808                                 key[len++] = ch;
3809                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3810                 break;
3811
3812         case 'f':
3813                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3814                 if( in = fopen (args->infile, "rb") )
3815                   while( ch = getc(in), ch != EOF )
3816                         if( ch == '\n' )
3817                         {
3818                           line++;
3819                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3820                                 found++;
3821                           else if( bt->mgr->err )
3822                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3823                           len = 0;
3824                         }
3825                         else if( len < BT_maxkey )
3826                                 key[len++] = ch;
3827                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3828                 break;
3829
3830         case 's':
3831                 fprintf(stderr, "started forward scan\n");
3832                 if( bt_startkey (bt, NULL, 0) )
3833                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3834
3835                 while( bt_nextkey (bt) ) {
3836                   if( bt->phase == 1 ) {
3837                         len = bt->mainkey->len;
3838
3839                         if( bt->mainnode->type == Duplicate )
3840                                 len -= BtId;
3841
3842                         fwrite (bt->mainkey->key, len, 1, stdout);
3843                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3844                   } else {
3845                         len = bt->cachekey->len;
3846
3847                         if( bt->cachenode->type == Duplicate )
3848                                 len -= BtId;
3849
3850                         fwrite (bt->cachekey->key, len, 1, stdout);
3851                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3852                   }
3853
3854                   fputc ('\n', stdout);
3855                   cnt++;
3856             }
3857
3858                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3859                 bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
3860
3861                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3862                 bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
3863
3864                 fprintf(stderr, " Total keys read %d\n", cnt);
3865                 break;
3866
3867         case 'r':
3868                 fprintf(stderr, "started reverse scan\n");
3869                 if( bt_lastkey (bt) )
3870                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3871
3872                 while( bt_prevkey (bt) ) {
3873                   if( bt->phase == 1 ) {
3874                         len = bt->mainkey->len;
3875
3876                         if( bt->mainnode->type == Duplicate )
3877                                 len -= BtId;
3878
3879                         fwrite (bt->mainkey->key, len, 1, stdout);
3880                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3881                   } else {
3882                         len = bt->cachekey->len;
3883
3884                         if( bt->cachenode->type == Duplicate )
3885                                 len -= BtId;
3886
3887                         fwrite (bt->cachekey->key, len, 1, stdout);
3888                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3889                   }
3890
3891                   fputc ('\n', stdout);
3892                   cnt++;
3893             }
3894
3895                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3896                 bt_unpinlatch (bt->cacheset->latch, bt->thread_no, __LINE__);
3897
3898                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3899                 bt_unpinlatch (bt->mainset->latch, bt->thread_no, __LINE__);
3900
3901                 fprintf(stderr, " Total keys read %d\n", cnt);
3902                 break;
3903
3904         case 'c':
3905                 fprintf(stderr, "started counting LSM cache btree\n");
3906                 page_no = LEAF_page;
3907
3908                 do {
3909                   if( set->latch = bt_pinleaf (bt->mgr, page_no, bt->thread_no) )
3910                         set->page = bt_mappage (bt->mgr, set->latch);
3911                   else
3912                         return 0;
3913
3914                   bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3915                   cnt += set->page->act;
3916                   page_no = bt_getid (set->page->right);
3917                   bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3918                   bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3919                 } while( page_no );
3920                 
3921                 cachecnt = --cnt;       // remove stopper key
3922                 cnt = 0;
3923
3924                 fprintf(stderr, "started counting LSM main btree\n");
3925                 page_no = LEAF_page;
3926
3927                 do {
3928                   if( set->latch = bt_pinleaf (bt->main, page_no, bt->thread_no) )
3929                         set->page = bt_mappage (bt->main, set->latch);
3930                   else
3931                         return 0;
3932
3933                   bt_lockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3934                   cnt += set->page->act;
3935                   page_no = bt_getid (set->page->right);
3936                   bt_unlockpage(BtLockRead, set->latch, bt->thread_no, __LINE__);
3937                   bt_unpinlatch (set->latch, bt->thread_no, __LINE__);
3938                 } while( page_no );
3939                 
3940                 cnt--;  // remove stopper key
3941
3942                 fprintf(stderr, " cache keys counted %d\n", cachecnt);
3943                 fprintf(stderr, " main keys counted %d\n", cnt);
3944                 fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
3945                 break;
3946         }
3947
3948         bt_close (bt);
3949 #ifdef unix
3950         return NULL;
3951 #else
3952         return 0;
3953 #endif
3954 }
3955
3956 typedef struct timeval timer;
3957
3958 int main (int argc, char **argv)
3959 {
3960 int idx, cnt, len, slot, err;
3961 double start, stop;
3962 #ifdef unix
3963 pthread_t *threads;
3964 #else
3965 HANDLE *threads;
3966 #endif
3967 ThreadArg *args;
3968 uint mainleafpool = 0;
3969 uint mainleafxtra = 0;
3970 uint redopages = 0;
3971 uint poolsize = 0;
3972 uint leafpool = 0;
3973 uint leafxtra = 0;
3974 uint mainpool = 0;
3975 uint mainbits = 0;
3976 int bits = 16;
3977 float elapsed;
3978 int num = 0;
3979 char key[1];
3980 BtMgr *main;
3981 BtMgr *mgr;
3982 BtKey *ptr;
3983
3984         if( argc < 3 ) {
3985                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3986                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3987                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3988                 fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3989                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3990                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3991                 fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
3992                 fprintf (stderr, "  leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3993                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3994                 fprintf (stderr, "  redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3995                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3996                 fprintf (stderr, "  mainpool is the number of main pages in the main buffer pool\n");
3997                 fprintf (stderr, "  mainleaf is the number of main pages in the main leaf pool\n");
3998                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3999                 exit(0);
4000         }
4001
4002         start = getCpuTime(0);
4003
4004         if( argc > 4 )
4005                 bits = atoi(argv[4]);
4006
4007         if( argc > 5 )
4008                 leafxtra = atoi(argv[5]);
4009
4010         if( argc > 6 )
4011                 poolsize = atoi(argv[6]);
4012
4013         if( !poolsize )
4014                 fprintf (stderr, "no page pool\n"), exit(1);
4015
4016         if( argc > 7 )
4017                 leafpool = atoi(argv[7]);
4018
4019         if( argc > 8 )
4020                 num = atoi(argv[8]);
4021
4022         if( argc > 9 )
4023                 redopages = atoi(argv[9]);
4024
4025         if( redopages > 65535 )
4026                 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
4027
4028         if( argc > 10 )
4029                 mainbits = atoi(argv[10]);
4030
4031         if( argc > 11 )
4032                 mainleafxtra = atoi(argv[11]);
4033
4034         if( argc > 12 )
4035                 mainpool = atoi(argv[12]);
4036
4037         if( argc > 13 )
4038                 mainleafpool = atoi(argv[13]);
4039
4040         cnt = argc - 14;
4041 #ifdef unix
4042         threads = malloc (cnt * sizeof(pthread_t));
4043 #else
4044         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
4045 #endif
4046         args = malloc ((cnt + 1) * sizeof(ThreadArg));
4047
4048         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
4049
4050         if( !mgr ) {
4051                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
4052                 exit (1);
4053         }
4054
4055         if( mainbits ) {
4056                 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
4057
4058                 if( !main ) {
4059                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
4060                         exit (1);
4061                 }
4062         } else
4063                 main = NULL;
4064
4065         //      fire off threads
4066
4067         if( cnt )
4068           for( idx = 0; idx < cnt; idx++ ) {
4069                 args[idx].infile = argv[idx + 14];
4070                 args[idx].type = argv[3];
4071                 args[idx].main = main;
4072                 args[idx].mgr = mgr;
4073                 args[idx].num = num;
4074                 args[idx].idx = idx;
4075 #ifdef unix
4076                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
4077                         fprintf(stderr, "Error creating thread %d\n", err);
4078 #else
4079                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
4080 #endif
4081           }
4082         else {
4083                 args[0].infile = argv[idx + 10];
4084                 args[0].type = argv[3];
4085                 args[0].main = main;
4086                 args[0].mgr = mgr;
4087                 args[0].num = num;
4088                 args[0].idx = 0;
4089                 index_file (args);
4090         }
4091
4092         //      wait for termination
4093
4094 #ifdef unix
4095         for( idx = 0; idx < cnt; idx++ )
4096                 pthread_join (threads[idx], NULL);
4097 #else
4098         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4099
4100         for( idx = 0; idx < cnt; idx++ )
4101                 CloseHandle(threads[idx]);
4102 #endif
4103         bt_poolaudit(mgr, "cache");
4104
4105         if( main )
4106                 bt_poolaudit(main, "main");
4107
4108         fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4109         if( main )
4110                 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4111
4112         if( main )
4113                 bt_mgrclose (main);
4114         bt_mgrclose (mgr);
4115
4116         elapsed = getCpuTime(0) - start;
4117         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4118         elapsed = getCpuTime(1);
4119         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4120         elapsed = getCpuTime(2);
4121         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4122 }
4123
4124 BtKey *bt_key (BtPage page, uint slot)
4125 {
4126 return keyptr(page,slot);
4127 }
4128
4129 BtSlot *bt_slot (BtPage page, uint slot)
4130 {
4131 return slotptr(page,slot);
4132 }
4133 #endif  //STANDALONE