]> pd.if.org Git - btree/blob - threadskv10g.c
Several deletion bugs fixed
[btree] / threadskv10g.c
1 // btree version threadskv10g futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      traditional buffer pool manager
8 //      ACID batched key-value updates
9 //      redo log for failure recovery
10 //      LSM B-trees for write optimization
11 //      larger sized leaf pages than non-leaf
12 //      and LSM B-tree find & count operations
13
14 // 28 OCT 2014
15
16 // author: karl malbrain, malbrain@cal.berkeley.edu
17
18 /*
19 This work, including the source code, documentation
20 and related data, is placed into the public domain.
21
22 The orginal author is Karl Malbrain.
23
24 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
25 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
26 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
27 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
28 RESULTING FROM THE USE, MODIFICATION, OR
29 REDISTRIBUTION OF THIS SOFTWARE.
30 */
31
32 // Please see the project home page for documentation
33 // code.google.com/p/high-concurrency-btree
34
35 #define _FILE_OFFSET_BITS 64
36 #define _LARGEFILE64_SOURCE
37
38 #ifdef linux
39 #define _GNU_SOURCE
40 #include <xmmintrin.h>
41 #include <linux/futex.h>
42 #define SYS_futex 202
43 #endif
44
45 #ifdef unix
46 #include <unistd.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <fcntl.h>
50 #include <sys/time.h>
51 #include <sys/mman.h>
52 #include <errno.h>
53 #include <pthread.h>
54 #include <limits.h>
55 #else
56 #define WIN32_LEAN_AND_MEAN
57 #include <windows.h>
58 #include <stdio.h>
59 #include <stdlib.h>
60 #include <time.h>
61 #include <fcntl.h>
62 #include <process.h>
63 #include <intrin.h>
64 #endif
65
66 #include <memory.h>
67 #include <string.h>
68 #include <stddef.h>
69
70 typedef unsigned long long      uid;
71 typedef unsigned long long      logseqno;
72
73 #ifndef unix
74 typedef unsigned long long      off64_t;
75 typedef unsigned short          ushort;
76 typedef unsigned int            uint;
77 #endif
78
79 #define BT_ro 0x6f72    // ro
80 #define BT_rw 0x7772    // rw
81
82 #define BT_maxbits              26                                      // maximum page size in bits
83 #define BT_minbits              9                                       // minimum page size in bits
84 #define BT_minpage              (1 << BT_minbits)       // minimum page size
85 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
86
87 //  BTree page number constants
88 #define ALLOC_page              0       // allocation page
89 #define ROOT_page               1       // root of the btree
90 #define LEAF_page               2       // first page of leaves
91
92 //      Number of levels to create in a new BTree
93
94 #define MIN_lvl                 2
95
96 /*
97 There are six lock types for each node in four independent sets: 
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
104 */
105
106 typedef enum{
107         BtLockAccess = 1,
108         BtLockDelete = 2,
109         BtLockRead   = 4,
110         BtLockWrite  = 8,
111         BtLockParent = 16,
112         BtLockLink   = 32
113 } BtLock;
114
115 typedef struct {
116   union {
117         struct {
118           volatile unsigned char xcl[1];
119           volatile unsigned char filler;
120           volatile ushort waiters[1];
121         } bits[1];
122         uint value[1];
123   };
124 } MutexLatch;
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   MutexLatch xcl[1];
130   MutexLatch wrt[1];
131   ushort readers;
132   ushort dup;           // re-entrant locks
133   ushort tid;           // owner thread-no
134   ushort line;          // owner line #
135 } RWLock;
136
137 //  hash table entries
138
139 typedef struct {
140         MutexLatch latch[1];
141         uint entry;             // Latch table entry at head of chain
142 } BtHashEntry;
143
144 //      latch manager table structure
145
146 typedef struct {
147         uid page_no;                    // latch set page number
148         RWLock readwr[1];               // read/write page lock
149         RWLock access[1];               // Access Intent/Page delete
150         RWLock parent[1];               // Posting of fence key in parent
151         RWLock link[1];                 // left link update in progress
152         MutexLatch modify[1];   // modify entry lite latch
153         uint split;                             // right split page atomic insert
154         uint next;                              // next entry in hash table chain
155         uint prev;                              // prev entry in hash table chain
156         volatile ushort pin;    // number of accessing threads
157         unsigned char dirty;    // page in cache is dirty
158         unsigned char leaf;             // page in cache is a leaf
159 } BtLatchSet;
160
161 //      Define the length of the page record numbers
162
163 #define BtId 6
164
165 //      Page key slot definition.
166
167 //      Keys are marked dead, but remain on the page until
168 //      it cleanup is called. The fence key (highest key) for
169 //      a leaf page is always present, even after cleanup.
170
171 //      Slot types
172
173 //      In addition to the Unique keys that occupy slots
174 //      there are Librarian and Duplicate key
175 //      slots occupying the key slot array.
176
177 //      The Librarian slots are dead keys that
178 //      serve as filler, available to add new Unique
179 //      or Dup slots that are inserted into the B-tree.
180
181 //      The Duplicate slots have had their key bytes extended
182 //      by 6 bytes to contain a binary duplicate key uniqueifier.
183
184 typedef enum {
185         Unique,
186         Update,
187         Librarian,
188         Duplicate,
189         Delete
190 } BtSlotType;
191
192 typedef struct {
193         uint off:BT_maxbits;    // page offset for key start
194         uint type:3;                    // type of slot
195         uint dead:1;                    // set for deleted slot
196 } BtSlot;
197
198 //      The key structure occupies space at the upper end of
199 //      each page.  It's a length byte followed by the key
200 //      bytes.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char key[0];
205 } BtKey;
206
207 //      the value structure also occupies space at the upper
208 //      end of the page. Each key is immediately followed by a value.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char value[0];
213 } BtVal;
214
215 #define BT_maxkey       255             // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217
218 //      The first part of an index page.
219 //      It is immediately followed
220 //      by the BtSlot array of keys.
221
222 typedef struct BtPage_ {
223         uint cnt;                                       // count of keys in page
224         uint act;                                       // count of active keys
225         uint min;                                       // next key offset
226         uint garbage;                           // page garbage in bytes
227         unsigned char lvl;                      // level of page
228         unsigned char free;                     // page is on free chain
229         unsigned char kill;                     // page is being deleted
230         unsigned char nopromote;        // page is being constructed
231         unsigned char filler1[6];       // padding to multiple of 8 bytes
232         unsigned char right[BtId];      // page number to right
233         unsigned char left[BtId];       // page number to left
234         unsigned char filler2[2];       // padding to multiple of 8 bytes
235         logseqno lsn;                           // log sequence number applied
236         uid page_no;                            // this page number
237 } *BtPage;
238
239 //  The loadpage interface object
240
241 typedef struct {
242         BtPage page;            // current page pointer
243         BtLatchSet *latch;      // current page latch set
244 } BtPageSet;
245
246 //      structure for latch manager on ALLOC_page
247
248 typedef struct {
249         struct BtPage_ alloc[1];                // next page_no in right ptr
250         unsigned char freechain[BtId];  // head of free page_nos chain
251         unsigned char leafchain[BtId];  // head of leaf page_nos chain
252         unsigned long long leafpages;   // number of active leaf pages
253         unsigned long long upperpages;  // number of active upper pages
254         uint redopages;                                 // number of redo pages in file
255         unsigned char leaf_xtra;                // leaf page size in xtra bits
256         unsigned char page_bits;                // base page size in bits
257 } BtPageZero;
258
259 //      The object structure for Btree access
260
261 typedef struct {
262         uint page_size;                         // base page size       
263         uint page_bits;                         // base page size in bits       
264         uint leaf_xtra;                         // leaf xtra bits       
265 #ifdef unix
266         int idx;
267 #else
268         HANDLE idx;
269 #endif
270         BtPageZero *pagezero;           // mapped allocation page
271         BtHashEntry *hashtable;         // the buffer pool hash table entries
272         BtHashEntry *leaftable;         // the buffer pool hash table entries
273         BtLatchSet *latchsets;          // mapped latch set from buffer pool
274         BtLatchSet *leafsets;           // mapped latch set from buffer pool
275         unsigned char *pagepool;        // mapped to the buffer pool pages
276         unsigned char *leafpool;        // mapped to the leaf pool pages
277         unsigned char *redobuff;        // mapped recovery buffer pointer
278         logseqno lsn, flushlsn;         // current & first lsn flushed
279         MutexLatch redo[1];                     // redo area lite latch
280         MutexLatch lock[1];                     // allocation area lite latch
281         ushort thread_no[1];            // highest thread number issued
282         ushort err_thread;                      // error thread number
283         uint nlatchpage;                        // size of buffer pool & latchsets
284         uint latchtotal;                        // number of page latch entries
285         uint latchhash;                         // number of latch hash table slots
286         uint latchvictim;                       // next latch entry to swap out
287         uint nleafpage;                         // size of leaf pool & leafsets
288         uint leaftotal;                         // number of leaf latch entries
289         uint leafhash;                          // number of leaf hash table slots
290         uint leafvictim;                        // next leaf entry to swap out
291         uint leafpromote;                       // next leaf entry to promote
292         uint redopage;                          // page number of redo buffer
293         uint redolast;                          // last msync size of recovery buff
294         uint redoend;                           // eof/end element in recovery buff
295         int err;                                        // last error
296         int line;                                       // last error line no
297         int found;                                      // number of keys found by delete
298         int reads, writes;                      // number of reads and writes
299         int type;                                       // type of LSM tree 0=cache, 1=main
300 #ifndef unix
301         HANDLE halloc;                          // allocation handle
302         HANDLE hpool;                           // buffer pool handle
303 #endif
304         unsigned char *page0;           // memory mapped pagezero of b-tree
305 } BtMgr;
306
307 typedef struct {
308         BtMgr *mgr;                                     // buffer manager for entire process
309         BtMgr *main;                            // buffer manager for main btree
310         BtPageSet cacheset[1];  // cached page frame for cache btree
311         BtPageSet mainset[1];   // cached page frame for main btree
312         uint cacheslot;                         // slot number in cacheset
313         uint mainslot;                          // slot number in mainset
314         ushort phase;                           // 1 = main btree 0 = cache btree 2 = both
315         ushort thread_no;                       // thread number
316         BtSlot *cachenode;
317         BtSlot *mainnode;
318         BtKey *cachekey;
319         BtKey *mainkey;
320         BtVal *cacheval;
321         BtVal *mainval;
322 } BtDb;
323
324 //      atomic txn structure
325
326 typedef struct {
327         logseqno reqlsn;        // redo log seq no required
328         uint entry:31;          // latch table entry number
329         uint reuse:1;           // reused previous page
330         uint slot;                      // slot on page
331 } AtomicTxn;
332
333 //      Catastrophic errors
334
335 typedef enum {
336         BTERR_ok = 0,
337         BTERR_struct,
338         BTERR_ovflw,
339         BTERR_lock,
340         BTERR_map,
341         BTERR_read,
342         BTERR_wrt,
343         BTERR_atomic,
344         BTERR_recovery
345 } BTERR;
346
347 #define CLOCK_bit 0x8000
348
349 // recovery manager entry types
350
351 typedef enum {
352         BTRM_eof = 0,   // rest of buffer is emtpy
353         BTRM_add,               // add a unique key-value to btree
354         BTRM_dup,               // add a duplicate key-value to btree
355         BTRM_del,               // delete a key-value from btree
356         BTRM_upd,               // update a key with a new value
357         BTRM_new,               // allocate a new empty page
358         BTRM_old                // reuse an old empty page
359 } BTRM;
360
361 // recovery manager entry
362 //      structure followed by BtKey & BtVal
363
364 typedef struct {
365         logseqno reqlsn;        // log sequence number required
366         logseqno lsn;           // log sequence number for entry
367         uint len;                       // length of entry
368         unsigned char type;     // type of entry
369         unsigned char lvl;      // level of btree entry pertains to
370 } BtLogHdr;
371
372 // B-Tree functions
373
374 extern void bt_close (BtDb *bt);
375 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
376 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
377 extern BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
378 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
379 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line);
380 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no);
381 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no);
382
383 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
384
385 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
386 extern BTERR bt_nextkey (BtDb *bt);
387
388 extern uint bt_lastkey (BtDb *bt);
389 extern uint bt_prevkey (BtDb *bt);
390
391 //      manager functions
392 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize, uint leafpool, uint redopages);
393 extern void bt_mgrclose (BtMgr *mgr);
394 extern logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no);
395 extern logseqno bt_txnredo (BtMgr *mgr, BtPage page, ushort thread_no);
396
397 //      atomic transaction functions
398 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no);
399 BTERR bt_promote (BtDb *bt);
400
401 //  The page is allocated from low and hi ends.
402 //  The key slots are allocated from the bottom,
403 //      while the text and value of the key
404 //  are allocated from the top.  When the two
405 //  areas meet, the page is split into two.
406
407 //  A key consists of a length byte, two bytes of
408 //  index number (0 - 65535), and up to 253 bytes
409 //  of key value.
410
411 //  Associated with each key is a value byte string
412 //      containing any value desired.
413
414 //  The b-tree root is always located at page 1.
415 //      The first leaf page of level zero is always
416 //      located on page 2.
417
418 //      The b-tree pages are linked with next
419 //      pointers to facilitate enumerators,
420 //      and provide for concurrency.
421
422 //      When to root page fills, it is split in two and
423 //      the tree height is raised by a new root at page
424 //      one with two keys.
425
426 //      Deleted keys are marked with a dead bit until
427 //      page cleanup. The fence key for a leaf node is
428 //      always present
429
430 //  To achieve maximum concurrency one page is locked at a time
431 //  as the tree is traversed to find leaf key in question. The right
432 //  page numbers are used in cases where the page is being split,
433 //      or consolidated.
434
435 //  Page 0 is dedicated to lock for new page extensions,
436 //      and chains empty pages together for reuse. It also
437 //      contains the latch manager hash table.
438
439 //      The ParentModification lock on a node is obtained to serialize posting
440 //      or changing the fence key for a node.
441
442 //      Empty pages are chained together through the ALLOC page and reused.
443
444 //      Access macros to address slot and key values from the page
445 //      Page slots use 1 based indexing.
446
447 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
448 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
449 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
450
451 void bt_putid(unsigned char *dest, uid id)
452 {
453 int i = BtId;
454
455         while( i-- )
456                 dest[i] = (unsigned char)id, id >>= 8;
457 }
458
459 uid bt_getid(unsigned char *src)
460 {
461 uid id = 0;
462 int i;
463
464         for( i = 0; i < BtId; i++ )
465                 id <<= 8, id |= *src++; 
466
467         return id;
468 }
469
470 //      lite weight spin lock Latch Manager
471
472 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
473 {
474         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
475 }
476
477 void bt_mutexlock(MutexLatch *latch)
478 {
479 uint idx, waited = 0;
480 MutexLatch prev[1];
481
482  while( 1 ) {
483   for( idx = 0; idx < 100; idx++ ) {
484         *prev->value = __sync_fetch_and_or (latch->value, 1);
485         if( !*prev->bits->xcl ) {
486           if( waited )
487                 __sync_fetch_and_sub (latch->bits->waiters, 1);
488           return;
489         }
490   }
491
492   if( !waited ) {
493         __sync_fetch_and_add (latch->bits->waiters, 1);
494         *prev->bits->waiters += 1;
495         waited++;
496   }
497
498   sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
499  }
500 }
501
502 int bt_mutextry(MutexLatch *latch)
503 {
504         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
505 }
506
507 void bt_releasemutex(MutexLatch *latch)
508 {
509 MutexLatch prev[1];
510
511         *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
512
513         if( *prev->bits->waiters )
514                 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
515 }
516
517 //      reader/writer lock implementation
518
519 void WriteLock (RWLock *lock, ushort tid, uint line)
520 {
521         if( lock->tid == tid ) {
522                 lock->dup++;
523                 return;
524         }
525         bt_mutexlock (lock->xcl);
526         bt_mutexlock (lock->wrt);
527         bt_releasemutex (lock->xcl);
528
529         lock->line = line;
530         lock->tid = tid;
531 }
532
533 void WriteRelease (RWLock *lock)
534 {
535         if( lock->dup ) {
536                 lock->dup--;
537                 return;
538         }
539         lock->tid = 0;
540         bt_releasemutex (lock->wrt);
541 }
542
543 void ReadLock (RWLock *lock, ushort tid)
544 {
545         bt_mutexlock (lock->xcl);
546
547         if( !__sync_fetch_and_add (&lock->readers, 1) )
548                 bt_mutexlock (lock->wrt);
549
550         bt_releasemutex (lock->xcl);
551 }
552
553 void ReadRelease (RWLock *lock)
554 {
555         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
556                 bt_releasemutex (lock->wrt);
557 }
558
559 //      recovery manager -- flush dirty pages
560
561 void bt_flushlsn (BtMgr *mgr, ushort thread_no)
562 {
563 uint cnt3 = 0, cnt2 = 0, cnt = 0;
564 uint entry, segment;
565 BtLatchSet *latch;
566 BtPage page;
567
568   //    flush dirty pool pages to the btree
569
570 fprintf(stderr, "Start flushlsn  ");
571   for( entry = 1; entry < mgr->latchtotal; entry++ ) {
572         page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
573         latch = mgr->latchsets + entry;
574         bt_mutexlock (latch->modify);
575         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
576
577         if( latch->dirty ) {
578           bt_writepage(mgr, page, latch->page_no, 0);
579           latch->dirty = 0, cnt++;
580         }
581 if( latch->pin & ~CLOCK_bit )
582 cnt2++;
583         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
584         bt_releasemutex (latch->modify);
585   }
586   for( entry = 1; entry < mgr->leaftotal; entry++ ) {
587         page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
588         latch = mgr->leafsets + entry;
589         bt_mutexlock (latch->modify);
590         bt_lockpage(BtLockRead, latch, thread_no, __LINE__);
591
592         if( latch->dirty ) {
593           bt_writepage(mgr, page, latch->page_no, 1);
594           latch->dirty = 0, cnt++;
595         }
596 if( latch->pin & ~CLOCK_bit )
597 cnt2++;
598         bt_unlockpage(BtLockRead, latch, thread_no, __LINE__);
599         bt_releasemutex (latch->modify);
600   }
601 fprintf(stderr, "End flushlsn %d pages  %d pinned\n", cnt, cnt2);
602 }
603
604 //      recovery manager -- process current recovery buff on startup
605 //      this won't do much if previous session was properly closed.
606
607 BTERR bt_recoveryredo (BtMgr *mgr)
608 {
609 BtLogHdr *hdr, *eof;
610 uint offset = 0;
611 BtKey *key;
612 BtVal *val;
613
614   hdr = (BtLogHdr *)mgr->redobuff;
615   mgr->flushlsn = hdr->lsn;
616
617   while( 1 ) {
618         hdr = (BtLogHdr *)(mgr->redobuff + offset);
619         switch( hdr->type ) {
620         case BTRM_eof:
621                 mgr->lsn = hdr->lsn;
622                 return 0;
623         case BTRM_add:          // add a unique key-value to btree
624         
625         case BTRM_dup:          // add a duplicate key-value to btree
626         case BTRM_del:          // delete a key-value from btree
627         case BTRM_upd:          // update a key with a new value
628         case BTRM_new:          // allocate a new empty page
629         case BTRM_old:          // reuse an old empty page
630                 return 0;
631         }
632   }
633 }
634
635 //      recovery manager -- append new entry to recovery log
636 //      flush dirty pages to disk when it overflows.
637
638 logseqno bt_newredo (BtMgr *mgr, BTRM type, int lvl, BtKey *key, BtVal *val, ushort thread_no)
639 {
640 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
641 uint amt = sizeof(BtLogHdr);
642 BtLogHdr *hdr, *eof;
643 uint last, end;
644
645         bt_mutexlock (mgr->redo);
646
647         if( key )
648           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
649
650         //      see if new entry fits in buffer
651         //      flush and reset if it doesn't
652
653         if( amt > size - mgr->redoend ) {
654           mgr->flushlsn = mgr->lsn;
655           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
656                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
657           mgr->redolast = 0;
658           mgr->redoend = 0;
659           bt_flushlsn(mgr, thread_no);
660         }
661
662         //      fill in new entry & either eof or end block
663
664         hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
665
666         hdr->len = amt;
667         hdr->type = type;
668         hdr->lvl = lvl;
669         hdr->lsn = ++mgr->lsn;
670
671         mgr->redoend += amt;
672
673         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
674         memset (eof, 0, sizeof(BtLogHdr));
675
676         //  fill in key and value
677
678         if( key ) {
679           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
680           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
681         }
682
683         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
684         memset (eof, 0, sizeof(BtLogHdr));
685         eof->lsn = mgr->lsn;
686
687         last = mgr->redolast & ~0xfff;
688         end = mgr->redoend;
689
690         if( end - last + sizeof(BtLogHdr) >= 32768 )
691           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
692                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
693           else
694                 mgr->redolast = end;
695
696         bt_releasemutex(mgr->redo);
697         return hdr->lsn;
698 }
699
700 //      recovery manager -- append transaction to recovery log
701 //      flush dirty pages to disk when it overflows.
702
703 logseqno bt_txnredo (BtMgr *mgr, BtPage source, ushort thread_no)
704 {
705 uint size = mgr->page_size * mgr->pagezero->redopages - sizeof(BtLogHdr);
706 uint amt = 0, src, type;
707 BtLogHdr *hdr, *eof;
708 uint last, end;
709 logseqno lsn;
710 BtKey *key;
711 BtVal *val;
712
713         //      determine amount of redo recovery log space required
714
715         for( src = 0; src++ < source->cnt; ) {
716           key = keyptr(source,src);
717           val = valptr(source,src);
718           amt += key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
719           amt += sizeof(BtLogHdr);
720         }
721
722         bt_mutexlock (mgr->redo);
723
724         //      see if new entry fits in buffer
725         //      flush and reset if it doesn't
726
727         if( amt > size - mgr->redoend ) {
728           mgr->flushlsn = mgr->lsn;
729           if( msync (mgr->redobuff + (mgr->redolast & ~0xfff), mgr->redoend - (mgr->redolast & ~0xfff) + sizeof(BtLogHdr), MS_SYNC) < 0 )
730                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
731           mgr->redolast = 0;
732           mgr->redoend = 0;
733           bt_flushlsn (mgr, thread_no);
734         }
735
736         //      assign new lsn to transaction
737
738         lsn = ++mgr->lsn;
739
740         //      fill in new entries
741
742         for( src = 0; src++ < source->cnt; ) {
743           key = keyptr(source, src);
744           val = valptr(source, src);
745
746           switch( slotptr(source, src)->type ) {
747           case Unique:
748                 type = BTRM_add;
749                 break;
750           case Duplicate:
751                 type = BTRM_dup;
752                 break;
753           case Delete:
754                 type = BTRM_del;
755                 break;
756           }
757
758           amt = key->len + val->len + sizeof(BtKey) + sizeof(BtVal);
759           amt += sizeof(BtLogHdr);
760
761           hdr = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
762           hdr->len = amt;
763           hdr->type = type;
764           hdr->lsn = lsn;
765           hdr->lvl = 0;
766
767           //  fill in key and value
768
769           memcpy ((unsigned char *)(hdr + 1), key, key->len + sizeof(BtKey));
770           memcpy ((unsigned char *)(hdr + 1) + key->len + sizeof(BtKey), val, val->len + sizeof(BtVal));
771
772           mgr->redoend += amt;
773         }
774
775         eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
776         memset (eof, 0, sizeof(BtLogHdr));
777         eof->lsn = lsn;
778
779         last = mgr->redolast & ~0xfff;
780         end = mgr->redoend;
781
782         if( end - last + sizeof(BtLogHdr) >= 32768 )
783           if( msync (mgr->redobuff + last, end - last + sizeof(BtLogHdr), MS_SYNC) < 0 )
784                 fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
785           else
786                 mgr->redolast = end;
787
788         bt_releasemutex(mgr->redo);
789         return lsn;
790 }
791
792 //      read page into buffer pool from permanent location in Btree file
793
794 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
795 {
796 uint page_size = mgr->page_size;
797
798   if( leaf )
799         page_size <<= mgr->leaf_xtra;
800
801   if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
802         return mgr->err = BTERR_read;
803
804   __sync_fetch_and_add (&mgr->reads, 1);
805   return 0;
806 }
807
808 //      write page to permanent location in Btree file
809
810 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
811 {
812 uint page_size = mgr->page_size;
813
814   if( leaf )
815         page_size <<= mgr->leaf_xtra;
816
817   if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
818         return mgr->err = BTERR_wrt;
819
820   __sync_fetch_and_add (&mgr->writes, 1);
821   return 0;
822 }
823
824 //      set CLOCK bit in latch
825 //      decrement pin count
826
827 void bt_unpinlatch (BtLatchSet *latch, uint dirty, ushort thread_no, uint line)
828 {
829         bt_mutexlock(latch->modify);
830         if( dirty )
831                 latch->dirty = 1;
832         latch->pin |= CLOCK_bit;
833         latch->pin--;
834         bt_releasemutex(latch->modify);
835 }
836
837 //  return the btree cached page address
838
839 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
840 {
841 uid entry = latch - (latch->leaf ? mgr->leafsets : mgr->latchsets);
842 BtPage page;
843
844   if( latch->leaf )
845         page = (BtPage)((entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
846   else
847         page = (BtPage)((entry << mgr->page_bits) + mgr->pagepool);
848
849   return page;
850 }
851
852 //  return next available leaf entry
853 //        and with latch entry locked
854
855 uint bt_availleaf (BtMgr *mgr)
856 {
857 BtLatchSet *latch;
858 uint entry;
859
860   while( 1 ) {
861 #ifdef unix
862         entry = __sync_fetch_and_add (&mgr->leafvictim, 1) + 1;
863 #else
864         entry = _InterlockedIncrement (&mgr->leafvictim);
865 #endif
866         entry %= mgr->leaftotal;
867
868         if( !entry )
869                 continue;
870
871         latch = mgr->leafsets + entry;
872
873         if( !bt_mutextry(latch->modify) )
874                 continue;
875
876         //  return this entry if it is not pinned
877
878         if( !latch->pin )
879                 return entry;
880
881         //      if the CLOCK bit is set
882         //      reset it to zero.
883
884         latch->pin &= ~CLOCK_bit;
885         bt_releasemutex(latch->modify);
886   }
887 }
888
889 //  return next available latch entry
890 //        and with latch entry locked
891
892 uint bt_availnext (BtMgr *mgr)
893 {
894 BtLatchSet *latch;
895 uint entry;
896
897   while( 1 ) {
898 #ifdef unix
899         entry = __sync_fetch_and_add (&mgr->latchvictim, 1) + 1;
900 #else
901         entry = _InterlockedIncrement (&mgr->latchvictim);
902 #endif
903         entry %= mgr->latchtotal;
904
905         if( !entry )
906                 continue;
907
908         latch = mgr->latchsets + entry;
909
910         if( !bt_mutextry(latch->modify) )
911                 continue;
912
913         //  return this entry if it is not pinned
914
915         if( !latch->pin )
916                 return entry;
917
918         //      if the CLOCK bit is set
919         //      reset it to zero.
920
921         latch->pin &= ~CLOCK_bit;
922         bt_releasemutex(latch->modify);
923   }
924 }
925
926 //      pin leaf in leaf buffer pool
927 //      return with latchset pinned
928
929 BtLatchSet *bt_pinleaf (BtMgr *mgr, uid page_no, ushort thread_no)
930 {
931 uint hashidx = page_no % mgr->leafhash;
932 uint entry, oldidx;
933 BtLatchSet *latch;
934 BtPage page;
935
936   //  try to find our entry
937
938   bt_mutexlock(mgr->leaftable[hashidx].latch);
939
940   if( entry = mgr->leaftable[hashidx].entry )
941    do {
942         latch = mgr->leafsets + entry;
943
944         if( page_no == latch->page_no )
945                 break;
946    } while( entry = latch->next );
947
948   //  found our entry: increment pin
949
950   if( entry ) {
951         latch = mgr->leafsets + entry;
952         bt_mutexlock(latch->modify);
953         latch->pin |= CLOCK_bit;
954         latch->pin++;
955
956         bt_releasemutex(latch->modify);
957         bt_releasemutex(mgr->leaftable[hashidx].latch);
958         return latch;
959   }
960
961   //  find and reuse unpinned entry
962
963 trynext:
964   entry = bt_availleaf (mgr);
965   latch = mgr->leafsets + entry;
966
967   page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
968   oldidx = latch->page_no % mgr->leafhash;
969
970   //  skip over this entry if old latch is not available
971
972   if( latch->page_no )
973    if( oldidx != hashidx )
974     if( !bt_mutextry (mgr->leaftable[oldidx].latch) ) {
975           bt_releasemutex(latch->modify);
976           goto trynext;
977         }
978
979   //  update permanent page area in btree from buffer pool
980   //    no read-lock is required since page is not pinned.
981
982   if( latch->dirty )
983         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 1) )
984           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
985         else
986           latch->dirty = 0;
987
988   //  if latch is on a different hash chain
989   //    unlink from the old page_no chain
990
991   if( latch->page_no )
992    if( oldidx != hashidx ) {
993         if( latch->prev )
994           mgr->leafsets[latch->prev].next = latch->next;
995         else
996           mgr->leaftable[oldidx].entry = latch->next;
997
998         if( latch->next )
999           mgr->leafsets[latch->next].prev = latch->prev;
1000
1001     bt_releasemutex (mgr->leaftable[oldidx].latch);
1002    }
1003
1004   if( bt_readpage (mgr, page, page_no, 1) )
1005         return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1006
1007   //  link page as head of hash table chain
1008   //  if this is a never before used entry,
1009   //  or it was previously on a different
1010   //  hash table chain. Otherwise, just
1011   //  leave it in its current hash table
1012   //  chain position.
1013
1014   if( !latch->page_no || hashidx != oldidx ) {
1015         if( latch->next = mgr->leaftable[hashidx].entry )
1016           mgr->leafsets[latch->next].prev = entry;
1017
1018         mgr->leaftable[hashidx].entry = entry;
1019     latch->prev = 0;
1020   }
1021
1022   //  fill in latch structure
1023
1024   latch->pin = CLOCK_bit | 1;
1025   latch->page_no = page_no;
1026   latch->leaf = 1;
1027
1028   bt_releasemutex (latch->modify);
1029   bt_releasemutex (mgr->leaftable[hashidx].latch);
1030   return latch;
1031 }
1032
1033 //      pin page in non-leaf buffer pool
1034 //      return with latchset pinned
1035
1036 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no, ushort thread_no)
1037 {
1038 uint hashidx = page_no % mgr->latchhash;
1039 uint entry, oldidx;
1040 BtLatchSet *latch;
1041 BtPage page;
1042
1043   //  try to find our entry
1044
1045   bt_mutexlock(mgr->hashtable[hashidx].latch);
1046
1047   if( entry = mgr->hashtable[hashidx].entry ) do
1048   {
1049         latch = mgr->latchsets + entry;
1050
1051         if( page_no == latch->page_no )
1052                 break;
1053   } while( entry = latch->next );
1054
1055   //  found our entry: increment pin
1056
1057   if( entry ) {
1058         latch = mgr->latchsets + entry;
1059         bt_mutexlock(latch->modify);
1060         latch->pin |= CLOCK_bit;
1061         latch->pin++;
1062         bt_releasemutex(latch->modify);
1063         bt_releasemutex(mgr->hashtable[hashidx].latch);
1064         return latch;
1065   }
1066
1067   //  find and reuse unpinned entry
1068
1069 trynext:
1070   entry = bt_availnext (mgr);
1071   latch = mgr->latchsets + entry;
1072
1073   page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1074   oldidx = latch->page_no % mgr->latchhash;
1075
1076   //  skip over this entry if latch not available
1077
1078   if( latch->page_no )
1079    if( oldidx != hashidx )
1080     if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
1081           bt_releasemutex(latch->modify);
1082           goto trynext;
1083         }
1084
1085   //  update permanent page area in btree from buffer pool
1086   //    no read-lock is required since page is not pinned.
1087
1088   if( latch->dirty )
1089         if( mgr->err = bt_writepage (mgr, page, latch->page_no, 0) )
1090           return mgr->line = __LINE__, mgr->err_thread = thread_no, NULL;
1091         else
1092           latch->dirty = 0;
1093
1094   //  if latch is on a different hash chain
1095   //    unlink from the old page_no chain
1096
1097   if( latch->page_no )
1098    if( oldidx != hashidx ) {
1099         if( latch->prev )
1100           mgr->latchsets[latch->prev].next = latch->next;
1101         else
1102           mgr->hashtable[oldidx].entry = latch->next;
1103
1104         if( latch->next )
1105           mgr->latchsets[latch->next].prev = latch->prev;
1106
1107     bt_releasemutex (mgr->hashtable[oldidx].latch);
1108    }
1109
1110   if( bt_readpage (mgr, page, page_no, 0) )
1111         return mgr->line = __LINE__, NULL;
1112
1113   //  link page as head of hash table chain
1114   //  if this is a never before used entry,
1115   //  or it was previously on a different
1116   //  hash table chain. Otherwise, just
1117   //  leave it in its current hash table
1118   //  chain position.
1119
1120   if( !latch->page_no || hashidx != oldidx ) {
1121         if( latch->next = mgr->hashtable[hashidx].entry )
1122           mgr->latchsets[latch->next].prev = entry;
1123
1124         mgr->hashtable[hashidx].entry = entry;
1125     latch->prev = 0;
1126   }
1127
1128   //  fill in latch structure
1129
1130   latch->pin = CLOCK_bit | 1;
1131   latch->page_no = page_no;
1132   latch->leaf = 0;
1133
1134   bt_releasemutex (latch->modify);
1135   bt_releasemutex (mgr->hashtable[hashidx].latch);
1136   return latch;
1137 }
1138   
1139 void bt_mgrclose (BtMgr *mgr)
1140 {
1141 char *name = mgr->type ? "Main" : "Cache";
1142 BtLatchSet *latch;
1143 BtLogHdr *eof;
1144 uint num = 0;
1145 BtPage page;
1146 uint entry;
1147
1148         //      flush previously written dirty pages
1149         //      and write recovery buffer to disk
1150
1151         fdatasync (mgr->idx);
1152
1153         if( mgr->redoend ) {
1154                 eof = (BtLogHdr *)(mgr->redobuff + mgr->redoend);
1155                 memset (eof, 0, sizeof(BtLogHdr));
1156         }
1157
1158         //      write remaining dirty pool pages to the btree
1159
1160         for( entry = 1; entry < mgr->latchtotal; entry++ ) {
1161                 page = (BtPage)(((uid)entry << mgr->page_bits) + mgr->pagepool);
1162                 latch = mgr->latchsets + entry;
1163
1164                 if( latch->dirty ) {
1165                         bt_writepage(mgr, page, latch->page_no, 0);
1166                         latch->dirty = 0, num++;
1167                 }
1168         }
1169
1170         if( num )
1171           fprintf(stderr, "%s %d non-leaf buffer pool pages flushed\n", name, num);
1172
1173         num = 0;
1174
1175         //      write remaining dirty leaf pages to the btree
1176
1177         for( entry = 1; entry < mgr->leaftotal; entry++ ) {
1178                 page = (BtPage)(((uid)entry << mgr->page_bits << mgr->leaf_xtra) + mgr->leafpool);
1179                 latch = mgr->leafsets + entry;
1180
1181                 if( latch->dirty ) {
1182                         bt_writepage(mgr, page, latch->page_no, 1);
1183                         latch->dirty = 0, num++;
1184                 }
1185         }
1186
1187         if( num )
1188           fprintf(stderr, "%s %d leaf buffer pool pages flushed\n", name, num);
1189
1190         //      clear redo recovery buffer on disk.
1191
1192         if( mgr->pagezero->redopages ) {
1193                 eof = (BtLogHdr *)mgr->redobuff;
1194                 memset (eof, 0, sizeof(BtLogHdr));
1195                 eof->lsn = mgr->lsn;
1196                 if( msync (mgr->redobuff, 4096, MS_SYNC) < 0 )
1197                   fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1198         }
1199
1200 #ifdef unix
1201         munmap (mgr->page0, (uid)(mgr->redopage + mgr->pagezero->redopages) << mgr->page_bits);
1202
1203         munmap (mgr->pagepool, (uid)mgr->nlatchpage << mgr->page_bits);
1204         munmap (mgr->leafpool, (uid)mgr->nleafpage << mgr->page_bits);
1205         munmap (mgr->pagezero, mgr->page_size);
1206 #else
1207         FlushViewOfFile(mgr->pagezero, 0);
1208         UnmapViewOfFile(mgr->pagezero);
1209         UnmapViewOfFile(mgr->pagepool);
1210         CloseHandle(mgr->halloc);
1211         CloseHandle(mgr->hpool);
1212 #endif
1213 #ifdef unix
1214         close (mgr->idx);
1215         free (mgr);
1216 #else
1217         FlushFileBuffers(mgr->idx);
1218         CloseHandle(mgr->idx);
1219         GlobalFree (mgr);
1220 #endif
1221 }
1222
1223 //      close and release memory
1224
1225 void bt_close (BtDb *bt)
1226 {
1227         free (bt);
1228 }
1229
1230 //  open/create new btree buffer manager
1231
1232 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
1233 //              size of page pool (e.g. 300) and leaf pool (e.g. 4000)
1234
1235 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax, uint leafmax, uint redopages)
1236 {
1237 uint lvl, attr, last, slot, idx;
1238 uint nlatchpage, latchhash;
1239 uint nleafpage, leafhash;
1240 unsigned char value[BtId];
1241 int flag, initit = 0;
1242 BtPageZero *pagezero;
1243 BtLatchSet *latch;
1244 off64_t size;
1245 uint amt[1];
1246 BtMgr* mgr;
1247 BtKey* key;
1248 BtVal *val;
1249
1250         // determine sanity of page size and buffer pool
1251
1252         if( leafxtra + pagebits > BT_maxbits )
1253                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
1254
1255         if( pagebits < BT_minbits )
1256                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
1257
1258 #ifdef unix
1259         mgr = calloc (1, sizeof(BtMgr));
1260
1261         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
1262
1263         if( mgr->idx == -1 ) {
1264                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1265                 return free(mgr), NULL;
1266         }
1267 #else
1268         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
1269         attr = FILE_ATTRIBUTE_NORMAL;
1270         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
1271
1272         if( mgr->idx == INVALID_HANDLE_VALUE ) {
1273                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
1274                 return GlobalFree(mgr), NULL;
1275         }
1276 #endif
1277
1278 #ifdef unix
1279         pagezero = valloc (BT_maxpage);
1280         *amt = 0;
1281
1282         // read minimum page size to get root info
1283         //      to support raw disk partition files
1284         //      check if page_bits == 0 on the disk.
1285
1286         if( size = lseek (mgr->idx, 0L, 2) )
1287                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
1288                         if( pagezero->page_bits ) {
1289                                 pagebits = pagezero->page_bits;
1290                                 leafxtra = pagezero->leaf_xtra;
1291                                 redopages = pagezero->redopages;
1292                         } else
1293                                 initit = 1;
1294                 else
1295                         return free(mgr), free(pagezero), NULL;
1296         else
1297                 initit = 1;
1298 #else
1299         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
1300         size = GetFileSize(mgr->idx, amt);
1301
1302         if( size || *amt ) {
1303                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
1304                         return bt_mgrclose (mgr), NULL;
1305                 pagebits = pagezero->page_bits;
1306                 leafxtra = pagezero->leaf_xtra;
1307                 redopages = pagezero->redopages;
1308         } else
1309                 initit = 1;
1310 #endif
1311
1312         mgr->page_size = 1 << pagebits;
1313         mgr->page_bits = pagebits;
1314         mgr->leaf_xtra = leafxtra;
1315
1316         //  calculate number of latch hash table entries
1317
1318         mgr->nlatchpage = ((uid)nodemax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1319
1320         mgr->nlatchpage += nodemax;             // size of the buffer pool in pages
1321         mgr->nlatchpage += (sizeof(BtLatchSet) * nodemax + mgr->page_size - 1) >> mgr->page_bits;
1322         mgr->latchtotal = nodemax;
1323
1324         //  calculate number of leaf hash table entries
1325
1326         mgr->nleafpage = ((uid)leafmax/16 * sizeof(BtHashEntry) + mgr->page_size - 1) >> mgr->page_bits;
1327
1328         mgr->nleafpage += leafmax << leafxtra;          // size of the leaf pool in pages
1329         mgr->nleafpage += (sizeof(BtLatchSet) * leafmax + mgr->page_size - 1) >> mgr->page_bits;
1330         mgr->leaftotal = leafmax;
1331
1332         mgr->redopage = LEAF_page + (1 << leafxtra);
1333
1334         if( !initit )
1335                 goto mgrlatch;
1336
1337         // initialize an empty b-tree with latch page, root page, page of leaves
1338         // and page(s) of latches and page pool cache
1339
1340         ftruncate (mgr->idx, (mgr->redopage + pagezero->redopages) << mgr->page_bits);
1341         memset (pagezero, 0, 1 << pagebits);
1342         pagezero->alloc->lvl = MIN_lvl - 1;
1343         pagezero->redopages = redopages;
1344         pagezero->page_bits = mgr->page_bits;
1345         pagezero->leaf_xtra = leafxtra;
1346
1347         bt_putid(pagezero->alloc->right, mgr->redopage + pagezero->redopages);
1348         pagezero->upperpages = 1;
1349         pagezero->leafpages = 1;
1350
1351         //  initialize left-most LEAF page in
1352         //      alloc->left and count of active leaf pages.
1353
1354         bt_putid (pagezero->alloc->left, LEAF_page);
1355
1356         if( bt_writepage (mgr, pagezero->alloc, 0, 0) ) {
1357                 fprintf (stderr, "Unable to create btree page zero\n");
1358                 return bt_mgrclose (mgr), NULL;
1359         }
1360
1361         memset (pagezero, 0, 1 << pagebits);
1362
1363         for( lvl=MIN_lvl; lvl--; ) {
1364         BtSlot *node = slotptr(pagezero->alloc, 1);
1365                 node->off = mgr->page_size;
1366
1367                 if( !lvl )
1368                         node->off <<= mgr->leaf_xtra;
1369
1370                 node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
1371                 node->type = Librarian;
1372                 node++->dead = 1;
1373
1374                 node->off = node[-1].off;
1375                 key = keyptr(pagezero->alloc, 2);
1376                 key = keyptr(pagezero->alloc, 1);
1377                 key->len = 2;           // create stopper key
1378                 key->key[0] = 0xff;
1379                 key->key[1] = 0xff;
1380
1381                 bt_putid(value, MIN_lvl - lvl + 1);
1382                 val = valptr(pagezero->alloc, 1);
1383                 val->len = lvl ? BtId : 0;
1384                 memcpy (val->value, value, val->len);
1385
1386                 pagezero->alloc->min = node->off;
1387                 pagezero->alloc->lvl = lvl;
1388                 pagezero->alloc->cnt = 2;
1389                 pagezero->alloc->act = 1;
1390                 pagezero->alloc->page_no = MIN_lvl - lvl;
1391
1392                 if( bt_writepage (mgr, pagezero->alloc, MIN_lvl - lvl, !lvl) ) {
1393                         fprintf (stderr, "Unable to create btree page\n");
1394                         return bt_mgrclose (mgr), NULL;
1395                 }
1396         }
1397
1398 mgrlatch:
1399 #ifdef unix
1400         free (pagezero);
1401 #else
1402         VirtualFree (pagezero, 0, MEM_RELEASE);
1403 #endif
1404
1405         // mlock the first segment of 64K pages
1406
1407         flag = PROT_READ | PROT_WRITE;
1408         mgr->page0 = mmap (0, (uid)(mgr->redopage + redopages) << mgr->page_bits, flag, MAP_SHARED, mgr->idx, 0);
1409
1410         if( mgr->page0 == MAP_FAILED ) {
1411                 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
1412                 return bt_mgrclose (mgr), NULL;
1413         }
1414
1415         mgr->pagezero = (BtPageZero *)mgr->page0;
1416         mlock (mgr->pagezero, mgr->page_size);
1417
1418         mgr->redobuff = mgr->page0 + (mgr->redopage << mgr->page_bits);
1419         mlock (mgr->redobuff, redopages << mgr->page_bits);
1420
1421         //      allocate pool buffers
1422
1423         mgr->pagepool = mmap (0, (uid)mgr->nlatchpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1424         if( mgr->pagepool == MAP_FAILED ) {
1425                 fprintf (stderr, "Unable to mmap anonymous buffer pool pages, error = %d\n", errno);
1426                 return bt_mgrclose (mgr), NULL;
1427         }
1428
1429         mgr->leafpool = mmap (0, (uid)mgr->nleafpage << mgr->page_bits, flag, MAP_ANONYMOUS | MAP_SHARED, -1, 0);
1430         if( mgr->leafpool == MAP_FAILED ) {
1431                 fprintf (stderr, "Unable to mmap anonymous leaf pool pages, error = %d\n", errno);
1432                 return bt_mgrclose (mgr), NULL;
1433         }
1434
1435         mgr->latchsets = (BtLatchSet *)(mgr->pagepool + ((uid)mgr->latchtotal << mgr->page_bits));
1436         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->latchtotal);
1437         mgr->latchhash = (mgr->pagepool + ((uid)mgr->nlatchpage << mgr->page_bits) - (unsigned char *)mgr->hashtable) / sizeof(BtHashEntry);
1438
1439         mgr->leafsets = (BtLatchSet *)(mgr->leafpool + ((uid)mgr->leaftotal << mgr->page_bits << mgr->leaf_xtra));
1440         mgr->leaftable = (BtHashEntry *)(mgr->leafsets + mgr->leaftotal);
1441         mgr->leafhash = (mgr->leafpool + ((uid)mgr->nleafpage << mgr->page_bits) - (unsigned char *)mgr->leaftable) / sizeof(BtHashEntry);
1442
1443         for( idx = 1; idx < mgr->leaftotal; idx++ ) {
1444                 latch = mgr->leafsets + idx;
1445                 latch->leaf = 1;
1446         }
1447
1448         return mgr;
1449 }
1450
1451 //      open BTree access method
1452 //      based on buffer manager
1453
1454 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
1455 {
1456 BtDb *bt = malloc (sizeof(*bt));
1457
1458         memset (bt, 0, sizeof(*bt));
1459         bt->main = main;
1460         bt->mgr = mgr;
1461 #ifdef unix
1462         bt->thread_no = __sync_fetch_and_add (mgr->thread_no, 1) + 1;
1463 #else
1464         bt->thread_no = _InterlockedIncrement16(mgr->thread_no, 1);
1465 #endif
1466         return bt;
1467 }
1468
1469 //  compare two keys, return > 0, = 0, or < 0
1470 //  =0: keys are same
1471 //  -1: key2 > key1
1472 //  +1: key2 < key1
1473 //  as the comparison value
1474
1475 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
1476 {
1477 uint len1 = key1->len;
1478 int ans;
1479
1480         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
1481                 return ans;
1482
1483         if( len1 > len2 )
1484                 return 1;
1485         if( len1 < len2 )
1486                 return -1;
1487
1488         return 0;
1489 }
1490
1491 // place write, read, or parent lock on requested page_no.
1492
1493 void bt_lockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1494 {
1495         switch( mode ) {
1496         case BtLockRead:
1497                 ReadLock (latch->readwr, thread_no);
1498                 break;
1499         case BtLockWrite:
1500                 WriteLock (latch->readwr, thread_no, line);
1501                 break;
1502         case BtLockAccess:
1503                 ReadLock (latch->access, thread_no);
1504                 break;
1505         case BtLockDelete:
1506                 WriteLock (latch->access, thread_no, line);
1507                 break;
1508         case BtLockParent:
1509                 WriteLock (latch->parent, thread_no, line);
1510                 break;
1511         case BtLockLink:
1512                 WriteLock (latch->link, thread_no, line);
1513                 break;
1514         }
1515 }
1516
1517 // remove write, read, or parent lock on requested page
1518
1519 void bt_unlockpage(BtLock mode, BtLatchSet *latch, ushort thread_no, uint line)
1520 {
1521         switch( mode ) {
1522         case BtLockRead:
1523                 ReadRelease (latch->readwr);
1524                 break;
1525         case BtLockWrite:
1526                 WriteRelease (latch->readwr);
1527                 break;
1528         case BtLockAccess:
1529                 ReadRelease (latch->access);
1530                 break;
1531         case BtLockDelete:
1532                 WriteRelease (latch->access);
1533                 break;
1534         case BtLockParent:
1535                 WriteRelease (latch->parent);
1536                 break;
1537         case BtLockLink:
1538                 WriteRelease (latch->link);
1539                 break;
1540         }
1541 }
1542
1543 //      allocate a new page
1544 //      return with page latched, but unlocked.
1545
1546 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents, ushort thread_no)
1547 {
1548 uint page_size = mgr->page_size, leaf_xtra = 0;
1549 unsigned char *freechain;
1550 uid page_no;
1551
1552         if( contents->lvl ) {
1553                 freechain = mgr->pagezero->freechain;
1554                 mgr->pagezero->upperpages++;
1555         } else {
1556                 freechain = mgr->pagezero->leafchain;
1557                 mgr->pagezero->leafpages++;
1558                 leaf_xtra = mgr->leaf_xtra;
1559                 page_size <<= leaf_xtra;
1560         }
1561
1562         //      lock allocation page
1563
1564         bt_mutexlock(mgr->lock);
1565
1566         // use empty chain first
1567         // else allocate new page
1568
1569         if( page_no = bt_getid(freechain) ) {
1570                 if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1571                         set->page = bt_mappage (mgr, set->latch);
1572                 else
1573                         return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1574
1575                 bt_putid(freechain, bt_getid(set->page->right));
1576
1577                 //  the page is currently nopromote and this
1578                 //  will keep bt_promote out.
1579
1580                 //      contents will replace this bit
1581                 //  and pin will keep bt_promote out
1582
1583                 contents->page_no = page_no;
1584                 contents->nopromote = 0;
1585
1586                 memcpy (set->page, contents, page_size);
1587
1588 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1589 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1590
1591                 bt_releasemutex(mgr->lock);
1592                 return 0;
1593         }
1594
1595         page_no = bt_getid(mgr->pagezero->alloc->right);
1596         bt_putid(mgr->pagezero->alloc->right, page_no+(1 << leaf_xtra));
1597
1598         // unlock allocation latch and
1599         //      extend file into new page.
1600
1601 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1602 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1603
1604         bt_releasemutex(mgr->lock);
1605
1606         //      keep bt_promote out of this page
1607         contents->nopromote = 1;
1608         contents->page_no = page_no;
1609
1610         if( pwrite (mgr->idx, contents, page_size, page_no << mgr->page_bits) < page_size )
1611                 fprintf(stderr, "Write %d error %d\n", (uint)page_no, errno);
1612
1613         if( set->latch = contents->lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1614                 set->page = bt_mappage (mgr, set->latch);
1615         else
1616                 return mgr->err_thread = thread_no, mgr->err;
1617
1618         // now pin will keep bt_promote out
1619
1620         set->page->nopromote = 0;
1621         return 0;
1622 }
1623
1624 //  find slot in page for given key at a given level
1625
1626 int bt_findslot (BtPage page, unsigned char *key, uint len)
1627 {
1628 uint diff, higher = page->cnt, low = 1, slot;
1629 uint good = 0;
1630
1631         //        make stopper key an infinite fence value
1632
1633         if( bt_getid (page->right) )
1634                 higher++;
1635         else
1636                 good++;
1637
1638         //      low is the lowest candidate.
1639         //  loop ends when they meet
1640
1641         //  higher is already
1642         //      tested as .ge. the passed key.
1643
1644         while( diff = higher - low ) {
1645                 slot = low + ( diff >> 1 );
1646                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1647                         low = slot + 1;
1648                 else
1649                         higher = slot, good++;
1650         }
1651
1652         //      return zero if key is on right link page
1653
1654         return good ? higher : 0;
1655 }
1656
1657 //  find and load page at given level for given key
1658 //      leave page rd or wr locked as requested
1659
1660 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, ushort thread_no)
1661 {
1662 uid page_no = ROOT_page, prevpage_no = 0;
1663 uint drill = 0xff, slot;
1664 BtLatchSet *prevlatch;
1665 uint mode, prevmode;
1666 BtPage prevpage;
1667 BtVal *val;
1668 BtKey *ptr;
1669
1670   //  start at root of btree and drill down
1671
1672   do {
1673         if( set->latch = drill ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1674           set->page = bt_mappage (mgr, set->latch);
1675         else
1676           return 0;
1677
1678         if( page_no > ROOT_page )
1679           bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1680
1681         //      release & unpin parent or left sibling page
1682
1683         if( prevpage_no ) {
1684           bt_unlockpage(prevmode, prevlatch, thread_no, __LINE__);
1685           bt_unpinlatch (prevlatch, 0, thread_no, __LINE__);
1686           prevpage_no = 0;
1687         }
1688
1689         // obtain mode lock using lock coupling through AccessLock
1690         // determine lock mode of drill level
1691
1692         mode = (drill == lvl) ? lock : BtLockRead; 
1693         bt_lockpage(mode, set->latch, thread_no, __LINE__);
1694
1695         // grab our fence key
1696
1697         ptr=keyptr(set->page,set->page->cnt);
1698
1699         if( set->page->free )
1700                 return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1701
1702         if( page_no > ROOT_page )
1703           bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
1704
1705         // re-read and re-lock root after determining actual level of root
1706
1707         if( set->page->lvl != drill) {
1708                 if( set->latch->page_no != ROOT_page )
1709                         return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1710                         
1711                 drill = set->page->lvl;
1712
1713                 if( lock != BtLockRead && drill == lvl ) {
1714                   bt_unlockpage(mode, set->latch, thread_no, __LINE__);
1715                   bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
1716                   continue;
1717                 }
1718         }
1719
1720         prevpage_no = set->latch->page_no;
1721         prevlatch = set->latch;
1722         prevpage = set->page;
1723         prevmode = mode;
1724
1725         //  if requested key is beyond our fence,
1726         //      slide to the right
1727
1728         if( keycmp (ptr, key, len) < 0 )
1729           if( page_no = bt_getid(set->page->right) )
1730                 continue;
1731
1732         //  if page is part of a delete operation,
1733         //      slide to the left;
1734
1735         if( set->page->kill ) {
1736           bt_lockpage(BtLockLink, set->latch, thread_no, __LINE__);
1737           page_no = bt_getid(set->page->left);
1738           bt_unlockpage(BtLockLink, set->latch, thread_no, __LINE__);
1739           continue;
1740         }
1741
1742         //  find key on page at this level
1743         //  and descend to requested level
1744
1745         if( slot = bt_findslot (set->page, key, len) ) {
1746           if( drill == lvl )
1747                 return slot;
1748
1749           // find next non-dead slot -- the fence key if nothing else
1750
1751           while( slotptr(set->page, slot)->dead )
1752                 if( slot++ < set->page->cnt )
1753                   continue;
1754                 else
1755                   return mgr->err = BTERR_struct, mgr->err_thread = thread_no, mgr->line = __LINE__, 0;
1756
1757           val = valptr(set->page, slot);
1758
1759           if( val->len == BtId )
1760                 page_no = bt_getid(val->value);
1761           else
1762                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct, 0;
1763
1764           drill--;
1765           continue;
1766          }
1767
1768         //  slide right into next page
1769
1770         page_no = bt_getid(set->page->right);
1771
1772   } while( page_no );
1773
1774   // return error on end of right chain
1775
1776   mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1777   return 0;     // return error
1778 }
1779
1780 //      return page to free list
1781 //      page must be delete & write locked
1782 //      and have no keys pointing to it.
1783
1784 void bt_freepage (BtMgr *mgr, BtPageSet *set, ushort thread_no)
1785 {
1786 unsigned char *freechain;
1787
1788         //      lock allocation page
1789
1790         bt_mutexlock (mgr->lock);
1791
1792         if( set->page->lvl ) {
1793                 freechain = mgr->pagezero->freechain;
1794                 mgr->pagezero->upperpages--;
1795         } else {
1796                 freechain = mgr->pagezero->leafchain;
1797                 mgr->pagezero->leafpages--;
1798         }
1799
1800         //      store chain
1801
1802         memcpy(set->page->right, freechain, BtId);
1803         bt_putid(freechain, set->latch->page_no);
1804         set->page->free = 1;
1805
1806 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1807 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1808
1809         // unlock released page
1810         // and unlock allocation page
1811
1812         bt_unlockpage (BtLockDelete, set->latch, thread_no, __LINE__);
1813         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1814         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1815         bt_releasemutex (mgr->lock);
1816 }
1817
1818 //      a fence key was deleted from a page
1819 //      push new fence value upwards
1820
1821 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl, ushort thread_no)
1822 {
1823 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1824 unsigned char value[BtId];
1825 BtKey* ptr;
1826 uint idx;
1827
1828         //      remove the old fence value
1829
1830         ptr = keyptr(set->page, set->page->cnt);
1831         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1832         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1833
1834         //  cache new fence value
1835
1836         ptr = keyptr(set->page, set->page->cnt);
1837         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1838
1839         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
1840         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1841
1842         //      insert new (now smaller) fence key
1843
1844         bt_putid (value, set->latch->page_no);
1845         ptr = (BtKey*)leftkey;
1846
1847         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
1848           return mgr->err_thread = thread_no, mgr->err;
1849
1850         //      now delete old fence key
1851
1852         ptr = (BtKey*)rightkey;
1853
1854         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1855                 return mgr->err_thread = thread_no, mgr->err;
1856
1857         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
1858         bt_unpinlatch(set->latch, 1, thread_no, __LINE__);
1859         return 0;
1860 }
1861
1862 //      root has a single child
1863 //      collapse a level from the tree
1864
1865 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root, ushort thread_no)
1866 {
1867 BtPageSet child[1];
1868 uid page_no;
1869 BtVal *val;
1870 uint idx;
1871
1872   // find the child entry and promote as new root contents
1873
1874   do {
1875         for( idx = 0; idx++ < root->page->cnt; )
1876           if( !slotptr(root->page, idx)->dead )
1877                 break;
1878
1879         val = valptr(root->page, idx);
1880
1881         if( val->len == BtId )
1882                 page_no = bt_getid (valptr(root->page, idx)->value);
1883         else
1884                 return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
1885
1886         if( child->latch = bt_pinlatch (mgr, page_no, thread_no) )
1887                 child->page = bt_mappage (mgr, child->latch);
1888         else
1889                 return mgr->err_thread = thread_no, mgr->err;
1890
1891         bt_lockpage (BtLockDelete, child->latch, thread_no, __LINE__);
1892         bt_lockpage (BtLockWrite, child->latch, thread_no, __LINE__);
1893
1894         memcpy (root->page, child->page, mgr->page_size);
1895         bt_freepage (mgr, child, thread_no);
1896
1897   } while( root->page->lvl > 1 && root->page->act == 1 );
1898
1899   bt_unlockpage (BtLockWrite, root->latch, thread_no, __LINE__);
1900   bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
1901   return 0;
1902 }
1903
1904 //  delete a page and manage key
1905 //  call with page writelocked
1906
1907 //      returns with page unpinned
1908 //      from the page pool.
1909
1910 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint lvl)
1911 {
1912 unsigned char lowerfence[BT_keyarray];
1913 uint page_size = mgr->page_size;
1914 BtPageSet right[1], temp[1];
1915 unsigned char value[BtId];
1916 uid page_no, right2;
1917 BtKey *ptr;
1918
1919         if( !lvl )
1920                 page_size <<= mgr->leaf_xtra;
1921
1922         //  cache original copy of original fence key
1923         //      that is going to be deleted.
1924
1925         ptr = keyptr(set->page, set->page->cnt);
1926         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1927
1928         //      pin and lock our right page
1929
1930         page_no = bt_getid(set->page->right);
1931
1932         if( right->latch = lvl ? bt_pinlatch (mgr, page_no, thread_no) : bt_pinleaf (mgr, page_no, thread_no) )
1933                 right->page = bt_mappage (mgr, right->latch);
1934         else
1935                 return 0;
1936
1937         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1938
1939         if( right->page->kill )
1940                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1941
1942         // pull contents of right sibling over our empty page
1943         //      preserving our left page number, and its right page number.
1944
1945         bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
1946         page_no = bt_getid(set->page->left);
1947         memcpy (set->page, right->page, page_size);
1948         bt_putid (set->page->left, page_no);
1949         set->page->page_no = set->latch->page_no;
1950         bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
1951
1952         //  fix left link from far right page
1953
1954         if( right2 = bt_getid (right->page->right) ) {
1955           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
1956                 temp->page = bt_mappage (mgr, temp->latch);
1957           else
1958                 return 0;
1959
1960       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1961           bt_putid (temp->page->left, set->latch->page_no);
1962           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
1963           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
1964         } else if( !lvl ) {     // our page is now rightmost leaf
1965           bt_mutexlock (mgr->lock);
1966           bt_putid (mgr->pagezero->alloc->left, set->latch->page_no);
1967           bt_releasemutex(mgr->lock);
1968         }
1969
1970         // mark right page as being deleted and release lock
1971
1972         right->page->kill = 1;
1973         bt_unlockpage (BtLockWrite, right->latch, thread_no, __LINE__);
1974
1975         // redirect the new higher key directly to our new node
1976
1977         ptr = keyptr(set->page, set->page->cnt);
1978         bt_putid (value, set->latch->page_no);
1979
1980         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update, thread_no) )
1981           return mgr->err;
1982
1983         //      delete our original fence key in parent
1984
1985         ptr = (BtKey *)lowerfence;
1986
1987         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1, thread_no) )
1988           return mgr->err;
1989
1990         //      release write lock to our node
1991
1992         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
1993         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
1994
1995         //  wait for all access to drain away with delete lock,
1996         //      then obtain write lock to right node and free it.
1997
1998         bt_lockpage (BtLockDelete, right->latch, thread_no, __LINE__);
1999         bt_lockpage (BtLockWrite, right->latch, thread_no, __LINE__);
2000         bt_freepage (mgr, right, thread_no);
2001         return 0;
2002 }
2003
2004 //  find and delete key on page by marking delete flag bit
2005 //  if page becomes empty, delete it from the btree
2006
2007 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, ushort thread_no)
2008 {
2009 uint slot, idx, found, fence;
2010 BtPageSet set[1];
2011 BtSlot *node;
2012 BtKey *ptr;
2013 BtVal *val;
2014
2015         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, thread_no) ) {
2016                 node = slotptr(set->page, slot);
2017                 ptr = keyptr(set->page, slot);
2018         } else
2019                 return mgr->err_thread = thread_no, mgr->err;
2020
2021         // if librarian slot, advance to real slot
2022
2023         if( node->type == Librarian ) {
2024                 ptr = keyptr(set->page, ++slot);
2025                 node = slotptr(set->page, slot);
2026         }
2027
2028         fence = slot == set->page->cnt;
2029
2030         // delete the key, ignore request if already dead
2031
2032         if( found = !keycmp (ptr, key, len) )
2033           if( found = node->dead == 0 ) {
2034                 val = valptr(set->page,slot);
2035                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
2036                 set->page->act--;
2037                 node->dead = 1;
2038
2039                 // collapse empty slots beneath the fence
2040                 // on interiour nodes
2041
2042                 if( lvl )
2043                  while( idx = set->page->cnt - 1 )
2044                   if( slotptr(set->page, idx)->dead ) {
2045                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
2046                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
2047                   } else
2048                         break;
2049           }
2050
2051         if( !found )
2052                 return 0;
2053
2054         //      did we delete a fence key in an upper level?
2055
2056         if( lvl && set->page->act && fence )
2057           if( bt_fixfence (mgr, set, lvl, thread_no) )
2058                 return mgr->err_thread = thread_no, mgr->err;
2059           else
2060                 return 0;
2061
2062         //      do we need to collapse root?
2063
2064         if( set->latch->page_no == ROOT_page && set->page->act == 1 )
2065           if( bt_collapseroot (mgr, set, thread_no) )
2066                 return mgr->err_thread = thread_no, mgr->err;
2067           else
2068                 return 0;
2069
2070         //      delete empty page
2071
2072         if( !set->page->act )
2073           return bt_deletepage (mgr, set, thread_no, set->page->lvl);
2074
2075         bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2076         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2077         return 0;
2078 }
2079
2080 //      check page for space available,
2081 //      clean if necessary and return
2082 //      0 - page needs splitting
2083 //      >0  new slot value
2084
2085 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
2086 {
2087 uint page_size = mgr->page_size;
2088 BtPage page = set->page, frame;
2089 uint cnt = 0, idx = 0;
2090 uint max = page->cnt;
2091 uint newslot = max;
2092 BtKey *key;
2093 BtVal *val;
2094
2095         if( !set->page->lvl )
2096                 page_size <<= mgr->leaf_xtra;
2097
2098         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
2099                 return slot;
2100
2101         //      skip cleanup and proceed to split
2102         //      if there's not enough garbage
2103         //      to bother with.
2104
2105         if( page->garbage < page_size / 5 )
2106                 return 0;
2107
2108         frame = malloc (page_size);
2109         memcpy (frame, page, page_size);
2110
2111         // skip page info and set rest of page to zero
2112
2113         memset (page+1, 0, page_size - sizeof(*page));
2114
2115         page->min = page_size;
2116         page->garbage = 0;
2117         page->act = 0;
2118
2119         // clean up page first by
2120         // removing dead keys
2121
2122         while( cnt++ < max ) {
2123                 if( cnt == slot )
2124                         newslot = idx + 2;
2125
2126                 if( cnt < max || frame->lvl )
2127                   if( slotptr(frame,cnt)->dead )
2128                         continue;
2129
2130                 // copy the value across
2131
2132                 val = valptr(frame, cnt);
2133                 page->min -= val->len + sizeof(BtVal);
2134                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
2135
2136                 // copy the key across
2137
2138                 key = keyptr(frame, cnt);
2139                 page->min -= key->len + sizeof(BtKey);
2140                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
2141
2142                 // make a librarian slot
2143
2144                 slotptr(page, ++idx)->off = page->min;
2145                 slotptr(page, idx)->type = Librarian;
2146                 slotptr(page, idx)->dead = 1;
2147
2148                 // set up the slot
2149
2150                 slotptr(page, ++idx)->off = page->min;
2151                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
2152
2153                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
2154                   page->act++;
2155         }
2156
2157         page->cnt = idx;
2158         free (frame);
2159
2160         //      see if page has enough space now, or does it need splitting?
2161
2162         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
2163                 return newslot;
2164
2165         return 0;
2166 }
2167
2168 // split the root and raise the height of the btree
2169
2170 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right, ushort thread_no)
2171 {  
2172 unsigned char leftkey[BT_keyarray];
2173 uint nxt = mgr->page_size;
2174 unsigned char value[BtId];
2175 BtPage frame, page;
2176 BtPageSet left[1];
2177 uid left_page_no;
2178 BtKey *ptr;
2179 BtVal *val;
2180
2181         frame = malloc (mgr->page_size);
2182         memcpy (frame, root->page, mgr->page_size);
2183
2184         //      save left page fence key for new root
2185
2186         ptr = keyptr(root->page, root->page->cnt);
2187         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2188
2189         //  Obtain an empty page to use, and copy the current
2190         //  root contents into it, e.g. lower keys
2191
2192         if( bt_newpage(mgr, left, frame, thread_no) )
2193                 return mgr->err_thread = thread_no, mgr->err;
2194
2195         left_page_no = left->latch->page_no;
2196         bt_unpinlatch (left->latch, 1, thread_no, __LINE__);
2197         free (frame);
2198
2199         //      left link the pages together
2200
2201         page = bt_mappage (mgr, right);
2202         bt_putid (page->left, left_page_no);
2203
2204         // preserve the page info at the bottom
2205         // of higher keys and set rest to zero
2206
2207         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
2208
2209         // insert stopper key at top of newroot page
2210         // and increase the root height
2211
2212         nxt -= BtId + sizeof(BtVal);
2213         bt_putid (value, right->page_no);
2214         val = (BtVal *)((unsigned char *)root->page + nxt);
2215         memcpy (val->value, value, BtId);
2216         val->len = BtId;
2217
2218         nxt -= 2 + sizeof(BtKey);
2219         slotptr(root->page, 2)->off = nxt;
2220         ptr = (BtKey *)((unsigned char *)root->page + nxt);
2221         ptr->len = 2;
2222         ptr->key[0] = 0xff;
2223         ptr->key[1] = 0xff;
2224
2225         // insert lower keys page fence key on newroot page as first key
2226
2227         nxt -= BtId + sizeof(BtVal);
2228         bt_putid (value, left_page_no);
2229         val = (BtVal *)((unsigned char *)root->page + nxt);
2230         memcpy (val->value, value, BtId);
2231         val->len = BtId;
2232
2233         ptr = (BtKey *)leftkey;
2234         nxt -= ptr->len + sizeof(BtKey);
2235         slotptr(root->page, 1)->off = nxt;
2236         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
2237         
2238         bt_putid(root->page->right, 0);
2239         root->page->min = nxt;          // reset lowest used offset and key count
2240         root->page->cnt = 2;
2241         root->page->act = 2;
2242         root->page->lvl++;
2243
2244         mgr->pagezero->alloc->lvl = root->page->lvl;
2245
2246         // release and unpin root pages
2247
2248         bt_unlockpage(BtLockWrite, root->latch, thread_no, __LINE__);
2249         bt_unpinlatch (root->latch, 1, thread_no, __LINE__);
2250
2251         bt_unpinlatch (right, 1, thread_no, __LINE__);
2252         return 0;
2253 }
2254
2255 //  split already locked full node
2256 //      leave it locked.
2257 //      return pool entry for new right
2258 //      page, pinned & unlocked
2259
2260 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, ushort thread_no, uint linkleft)
2261 {
2262 uint page_size = mgr->page_size;
2263 BtPageSet right[1], temp[1];
2264 uint cnt = 0, idx = 0, max;
2265 uint lvl = set->page->lvl;
2266 BtPage frame;
2267 BtKey *key;
2268 BtVal *val;
2269 uid right2;
2270 uint entry;
2271 uint prev;
2272
2273         if( !set->page->lvl )
2274                 page_size <<= mgr->leaf_xtra;
2275
2276         //  split higher half of keys to frame
2277
2278         frame = malloc (page_size);
2279         memset (frame, 0, page_size);
2280         frame->min = page_size;
2281         max = set->page->cnt;
2282         cnt = max / 2;
2283         idx = 0;
2284
2285         while( cnt++ < max ) {
2286                 if( cnt < max || set->page->lvl )
2287                   if( slotptr(set->page, cnt)->dead )
2288                         continue;
2289
2290                 val = valptr(set->page, cnt);
2291                 frame->min -= val->len + sizeof(BtVal);
2292                 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
2293
2294                 key = keyptr(set->page, cnt);
2295                 frame->min -= key->len + sizeof(BtKey);
2296                 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
2297
2298                 //      add librarian slot
2299
2300                 slotptr(frame, ++idx)->off = frame->min;
2301                 slotptr(frame, idx)->type = Librarian;
2302                 slotptr(frame, idx)->dead = 1;
2303
2304                 //  add actual slot
2305
2306                 slotptr(frame, ++idx)->off = frame->min;
2307                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
2308
2309                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
2310                   frame->act++;
2311         }
2312
2313         frame->cnt = idx;
2314         frame->lvl = lvl;
2315
2316         // link right node
2317
2318         if( set->latch->page_no > ROOT_page ) {
2319                 right2 = bt_getid (set->page->right);
2320                 bt_putid (frame->right, right2);
2321
2322                 if( linkleft )
2323                         bt_putid (frame->left, set->latch->page_no);
2324         }
2325
2326         //      get new free page and write higher keys to it.
2327
2328         if( bt_newpage(mgr, right, frame, thread_no) )
2329                 return 0;
2330
2331         //      link far right's left pointer to new page
2332
2333         if( linkleft && set->latch->page_no > ROOT_page )
2334          if( right2 ) {
2335           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2336                 temp->page = bt_mappage (mgr, temp->latch);
2337           else
2338                 return 0;
2339
2340       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2341           bt_putid (temp->page->left, right->latch->page_no);
2342           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2343           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2344          } else if( !lvl ) {    // page is rightmost leaf
2345           bt_mutexlock (mgr->lock);
2346           bt_putid (mgr->pagezero->alloc->left, right->latch->page_no);
2347           bt_releasemutex(mgr->lock);
2348          }
2349
2350         // process lower keys
2351
2352         memcpy (frame, set->page, page_size);
2353         memset (set->page+1, 0, page_size - sizeof(*set->page));
2354
2355         set->page->min = page_size;
2356         set->page->garbage = 0;
2357         set->page->act = 0;
2358         max /= 2;
2359         cnt = 0;
2360         idx = 0;
2361
2362         //  assemble page of smaller keys
2363
2364         while( cnt++ < max ) {
2365                 if( slotptr(frame, cnt)->dead )
2366                         continue;
2367                 val = valptr(frame, cnt);
2368                 set->page->min -= val->len + sizeof(BtVal);
2369                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
2370
2371                 key = keyptr(frame, cnt);
2372                 set->page->min -= key->len + sizeof(BtKey);
2373                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
2374
2375                 //      add librarian slot
2376
2377                 slotptr(set->page, ++idx)->off = set->page->min;
2378                 slotptr(set->page, idx)->type = Librarian;
2379                 slotptr(set->page, idx)->dead = 1;
2380
2381                 //      add actual slot
2382
2383                 slotptr(set->page, ++idx)->off = set->page->min;
2384                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
2385                 set->page->act++;
2386         }
2387
2388         bt_putid(set->page->right, right->latch->page_no);
2389         set->page->cnt = idx;
2390         free(frame);
2391
2392         entry = right->latch - (set->page->lvl ? mgr->latchsets : mgr->leafsets);
2393         return entry;
2394 }
2395
2396 //      fix keys for newly split page
2397 //      call with both pages pinned & locked
2398 //  return unlocked and unpinned
2399
2400 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right, ushort thread_no)
2401 {
2402 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
2403 unsigned char value[BtId];
2404 uint lvl = set->page->lvl;
2405 BtPageSet temp[1];
2406 BtPage page;
2407 BtKey *ptr;
2408 uid right2;
2409
2410         // if current page is the root page, split it
2411
2412         if( set->latch->page_no == ROOT_page )
2413                 return bt_splitroot (mgr, set, right, thread_no);
2414
2415         ptr = keyptr(set->page, set->page->cnt);
2416         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
2417
2418         page = bt_mappage (mgr, right);
2419
2420         ptr = keyptr(page, page->cnt);
2421         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
2422
2423         //      splice in far right page's left page_no
2424
2425         if( right2 = bt_getid (page->right) ) {
2426           if( temp->latch = lvl ? bt_pinlatch (mgr, right2, thread_no) : bt_pinleaf (mgr, right2, thread_no) )
2427                 temp->page = bt_mappage (mgr, temp->latch);
2428           else
2429                 return 0;
2430
2431       bt_lockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2432           bt_putid (temp->page->left, right->page_no);
2433           bt_unlockpage(BtLockLink, temp->latch, thread_no, __LINE__);
2434           bt_unpinlatch (temp->latch, 1, thread_no, __LINE__);
2435         } else if( !lvl ) {  // right page is far right page
2436           bt_mutexlock (mgr->lock);
2437           bt_putid (mgr->pagezero->alloc->left, right->page_no);
2438           bt_releasemutex(mgr->lock);
2439         }
2440         // insert new fences in their parent pages
2441
2442         bt_lockpage (BtLockParent, right, thread_no, __LINE__);
2443
2444         bt_lockpage (BtLockParent, set->latch, thread_no, __LINE__);
2445         bt_unlockpage (BtLockWrite, set->latch, thread_no, __LINE__);
2446
2447         // insert new fence for reformulated left block of smaller keys
2448
2449         bt_putid (value, set->latch->page_no);
2450         ptr = (BtKey *)leftkey;
2451
2452         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2453                 return mgr->err_thread = thread_no, mgr->err;
2454
2455         // switch fence for right block of larger keys to new right page
2456
2457         bt_putid (value, right->page_no);
2458         ptr = (BtKey *)rightkey;
2459
2460         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique, thread_no) )
2461                 return mgr->err_thread = thread_no, mgr->err;
2462
2463         bt_unlockpage (BtLockParent, set->latch, thread_no, __LINE__);
2464         bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2465
2466         bt_unlockpage (BtLockParent, right, thread_no, __LINE__);
2467         bt_unpinlatch (right, 1, thread_no, __LINE__);
2468         return 0;
2469 }
2470
2471 //      install new key and value onto page
2472 //      page must already be checked for
2473 //      adequate space
2474
2475 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2476 {
2477 uint idx, librarian;
2478 BtSlot *node;
2479 BtKey *ptr;
2480 BtVal *val;
2481 int rate;
2482
2483         //      if previous slot is a librarian slot, use it
2484
2485         if( slot > 1 )
2486           if( slotptr(set->page, slot-1)->type == Librarian )
2487                 slot--;
2488
2489         // copy value onto page
2490
2491         set->page->min -= vallen + sizeof(BtVal);
2492         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2493         memcpy (val->value, value, vallen);
2494         val->len = vallen;
2495
2496         // copy key onto page
2497
2498         set->page->min -= keylen + sizeof(BtKey);
2499         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2500         memcpy (ptr->key, key, keylen);
2501         ptr->len = keylen;
2502         
2503         //      find first empty slot at or above our insert slot
2504
2505         for( idx = slot; idx < set->page->cnt; idx++ )
2506           if( slotptr(set->page, idx)->dead )
2507                 break;
2508
2509         // now insert key into array before slot.
2510
2511         //      if we're going all the way to the top,
2512         //      add as many librarian slots as
2513         //      makes sense.
2514
2515         if( idx == set->page->cnt ) {
2516         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2517
2518                 librarian = ++idx - slot;
2519                 avail /= sizeof(BtSlot);
2520
2521                 if( avail < 0 )
2522                         avail = 0;
2523
2524                 if( librarian > avail )
2525                         librarian = avail;
2526
2527                 if( librarian ) {
2528                         rate = (idx - slot) / librarian;
2529                         set->page->cnt += librarian;
2530                         idx += librarian;
2531                 } else
2532                         rate = 0;
2533         } else
2534                 librarian = 0, rate = 0;
2535
2536         //  transfer slots and add librarian slots
2537
2538         while( idx > slot ) {
2539                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2540
2541                 //      add librarian slot per rate
2542
2543                 if( librarian )
2544                  if( (idx - slot)/2 <= librarian * rate ) {
2545                         node = slotptr(set->page, --idx);
2546                         node->off = node[1].off;
2547                         node->type = Librarian;
2548                         node->dead = 1;
2549                         librarian--;
2550                   }
2551
2552                 --idx;
2553         }
2554
2555         set->page->act++;
2556
2557         //      fill in new slot
2558
2559         node = slotptr(set->page, slot);
2560         node->off = set->page->min;
2561         node->type = type;
2562         node->dead = 0;
2563         return 0;
2564 }
2565
2566 //  Insert new key into the btree at given level.
2567 //      either add a new key or update/add an existing one
2568
2569 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type, ushort thread_no)
2570 {
2571 uint slot, idx, len, entry;
2572 BtPageSet set[1];
2573 BtSlot *node;
2574 BtKey *ptr;
2575 BtVal *val;
2576
2577   while( 1 ) { // find the page and slot for the current key
2578         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, thread_no) ) {
2579                 node = slotptr(set->page, slot);
2580                 ptr = keyptr(set->page, slot);
2581         } else
2582                 return mgr->err;
2583
2584         // if librarian slot == found slot, advance to real slot
2585
2586         if( node->type == Librarian ) {
2587                 node = slotptr(set->page, ++slot);
2588                 ptr = keyptr(set->page, slot);
2589         }
2590
2591         //  if inserting a duplicate key or unique
2592         //      key that doesn't exist on the page,
2593         //      check for adequate space on the page
2594         //      and insert the new key before slot.
2595
2596         switch( type ) {
2597         case Unique:
2598         case Duplicate:
2599           if( !keycmp (ptr, key, keylen) )
2600                 break;
2601
2602           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2603             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2604                   return mgr->err;
2605                 else
2606                   goto insxit;
2607
2608           if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2609             if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2610                   continue;
2611
2612           return mgr->err_thread = thread_no, mgr->err;
2613
2614         case Update:
2615           if( keycmp (ptr, key, keylen) )
2616                 goto insxit;
2617
2618           break;
2619         }
2620
2621         // if key already exists, update value and return
2622
2623         val = valptr(set->page, slot);
2624
2625         if( val->len >= vallen ) {
2626           if( node->dead )
2627                 set->page->act++;
2628           node->type = type;
2629           node->dead = 0;
2630
2631           set->page->garbage += val->len - vallen;
2632           val->len = vallen;
2633           memcpy (val->value, value, vallen);
2634           goto insxit;
2635         }
2636
2637         //  new update value doesn't fit in existing value area
2638         //      make sure page has room
2639
2640         if( !node->dead )
2641           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2642         else
2643           set->page->act++;
2644
2645         node->type = type;
2646         node->dead = 0;
2647
2648         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2649           break;
2650
2651         if( entry = bt_splitpage (mgr, set, thread_no, 1) )
2652           if( !bt_splitkeys (mgr, set, entry + (lvl ? mgr->latchsets : mgr->leafsets), thread_no) )
2653                 continue;
2654
2655         return mgr->err_thread = thread_no, mgr->err;
2656   }
2657
2658   //  copy key and value onto page and update slot
2659
2660   set->page->min -= vallen + sizeof(BtVal);
2661   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2662   memcpy (val->value, value, vallen);
2663   val->len = vallen;
2664
2665   set->page->min -= keylen + sizeof(BtKey);
2666   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2667   memcpy (ptr->key, key, keylen);
2668   ptr->len = keylen;
2669         
2670   slotptr(set->page,slot)->off = set->page->min;
2671
2672 insxit:
2673   bt_unlockpage(BtLockWrite, set->latch, thread_no, __LINE__);
2674   bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
2675   return 0;
2676 }
2677
2678 //      determine actual page where key is located
2679 //  return slot number
2680
2681 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, BtPageSet *set)
2682 {
2683 BtKey *key = keyptr(source,src), *ptr;
2684 uint slot = locks[src].slot;
2685 uint entry;
2686
2687         if( locks[src].reuse )
2688           entry = locks[src-1].entry;
2689         else
2690           entry = locks[src].entry;
2691
2692         if( slot ) {
2693                 set->latch = mgr->leafsets + entry;
2694                 set->page = bt_mappage (mgr, set->latch);
2695                 return slot;
2696         }
2697
2698         //      find where our key is located 
2699         //      on current page or pages split on
2700         //      same page txn operations.
2701
2702         do {
2703                 set->latch = mgr->leafsets + entry;
2704                 set->page = bt_mappage (mgr, set->latch);
2705
2706                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2707                   if( slotptr(set->page, slot)->type == Librarian )
2708                         slot++;
2709                   if( locks[src].reuse )
2710                         locks[src].entry = entry;
2711                   return slot;
2712                 }
2713         } while( entry = set->latch->split );
2714
2715         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2716         return 0;
2717 }
2718
2719 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2720 {
2721 BtKey *key = keyptr(source, src);
2722 BtVal *val = valptr(source, src);
2723 BtLatchSet *latch;
2724 BtPageSet set[1];
2725 uint entry, slot;
2726
2727   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2728         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2729           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,src)->type) )
2730                 return mgr->err_thread = thread_no, mgr->err;
2731
2732           set->page->lsn = lsn;
2733           return 0;
2734         }
2735
2736         //  split page
2737
2738         if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2739           latch = mgr->leafsets + entry;
2740         else
2741           return mgr->err;
2742
2743         //      splice right page into split chain
2744         //      and WriteLock it
2745
2746         bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2747         latch->split = set->latch->split;
2748         set->latch->split = entry;
2749
2750         // clear slot number for atomic page
2751
2752         locks[src].slot = 0;
2753   }
2754
2755   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_atomic;
2756 }
2757
2758 //      perform delete from smaller btree
2759 //  insert a delete slot if not found there
2760
2761 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint src, ushort thread_no, logseqno lsn)
2762 {
2763 BtKey *key = keyptr(source, src);
2764 uint idx, slot, entry;
2765 BtLatchSet *latch;
2766 BtPageSet set[1];
2767 BtSlot *node;
2768 BtKey *ptr;
2769 BtVal *val;
2770
2771   while( slot = bt_atomicpage (mgr, source, locks, src, set) ) {
2772         node = slotptr(set->page, slot);
2773         ptr = keyptr(set->page, slot);
2774         val = valptr(set->page, slot);
2775
2776         //  if slot is not found on cache btree, insert a delete slot
2777         //      otherwise ignore the request.
2778
2779         if( keycmp (ptr, key->key, key->len) )
2780          if( !mgr->type )
2781           if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
2782                 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
2783           else { //  split page before inserting Delete slot
2784                 if( entry = bt_splitpage (mgr, set, thread_no, 0) )
2785                   latch = mgr->leafsets + entry;
2786                 else
2787               return mgr->err;
2788
2789                 //      splice right page into split chain
2790                 //      and WriteLock it
2791
2792                 bt_lockpage(BtLockWrite, latch, thread_no, __LINE__);
2793                 latch->split = set->latch->split;
2794                 set->latch->split = entry;
2795
2796                 // clear slot number for atomic page
2797
2798                 locks[src].slot = 0;
2799                 continue;
2800           }
2801          else
2802            return 0;
2803
2804         //      if node is already dead,
2805         //      ignore the request.
2806
2807         if( node->type == Delete || node->dead )
2808                 return 0;
2809
2810         node->type = Delete;
2811
2812         //  if main LSM btree, delete the slot
2813
2814         if( mgr->type ) {
2815                 set->page->act--;
2816                 node->dead = 1;
2817         }
2818
2819         __sync_fetch_and_add(&mgr->found, 1);
2820         set->page->lsn = lsn;
2821         return 0;
2822   }
2823
2824   return mgr->line = __LINE__, mgr->err_thread = thread_no, mgr->err = BTERR_struct;
2825 }
2826
2827 //      release master's splits from right to left
2828
2829 void bt_atomicrelease (BtMgr *mgr, uint entry, ushort thread_no)
2830 {
2831 BtLatchSet *latch = mgr->leafsets + entry;
2832
2833         if( latch->split )
2834                 bt_atomicrelease (mgr, latch->split, thread_no);
2835
2836         latch->split = 0;
2837         bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
2838         bt_unpinlatch(latch, 1, thread_no, __LINE__);
2839 }
2840
2841 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2842 {
2843 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2844 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2845
2846         return keycmp (key1, key2->key, key2->len);
2847 }
2848 //      atomic modification of a batch of keys.
2849
2850 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2851 {
2852 uint src, idx, slot, samepage, entry, que = 0;
2853 BtKey *key, *ptr, *key2;
2854 int result = 0;
2855 BtSlot temp[1];
2856 logseqno lsn;
2857 int type;
2858
2859   // stable sort the list of keys into order to
2860   //    prevent deadlocks between threads.
2861 /*
2862   for( src = 1; src++ < source->cnt; ) {
2863         *temp = *slotptr(source,src);
2864         key = keyptr (source,src);
2865
2866         for( idx = src; --idx; ) {
2867           key2 = keyptr (source,idx);
2868           if( keycmp (key, key2->key, key2->len) < 0 ) {
2869                 *slotptr(source,idx+1) = *slotptr(source,idx);
2870                 *slotptr(source,idx) = *temp;
2871           } else
2872                 break;
2873         }
2874   }
2875 */
2876         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2877   //  add entries to redo log
2878
2879   if( bt->mgr->pagezero->redopages )
2880         lsn = bt_txnredo (bt->mgr, source, bt->thread_no);
2881   else
2882         lsn = 0;
2883
2884   //  perform the individual actions in the transaction
2885
2886   if( bt_atomicexec (bt->mgr, source, lsn, 0, bt->thread_no) )
2887         return bt->mgr->err;
2888
2889   // if number of active pages
2890   // is greater than the buffer pool
2891   // promote page into larger btree
2892
2893   if( bt->main )
2894    while( bt->mgr->pagezero->leafpages > bt->mgr->leaftotal - *bt->mgr->thread_no )
2895         if( bt_promote (bt) )
2896           return bt->mgr->err;
2897
2898   // return success
2899
2900   return 0;
2901 }
2902
2903 //      execute the source list of inserts/deletes
2904
2905 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, logseqno lsn, int lsm, ushort thread_no)
2906 {
2907 uint slot, src, idx, samepage, entry;
2908 BtPageSet set[1], prev[1];
2909 unsigned char value[BtId];
2910 BtLatchSet *latch;
2911 uid right_page_no;
2912 AtomicTxn *locks;
2913 BtKey *key, *ptr;
2914 BtPage page;
2915 BtVal *val;
2916
2917   locks = calloc (source->cnt + 1, sizeof(AtomicTxn));
2918
2919   // Load the leaf page for each key
2920   // group same page references with reuse bit
2921
2922   for( src = 0; src++ < source->cnt; ) {
2923         key = keyptr(source, src);
2924
2925         // first determine if this modification falls
2926         // on the same page as the previous modification
2927         //      note that the far right leaf page is a special case
2928
2929         if( samepage = src > 1 )
2930           samepage = !bt_getid(set->page->right) || keycmp (ptr, key->key, key->len) >= 0;
2931
2932         if( !samepage )
2933           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, thread_no) )
2934                 ptr = keyptr(set->page, set->page->cnt), set->latch->split = 0;
2935           else
2936                 return mgr->err;
2937         else
2938           slot = 0;
2939
2940         if( slot )
2941          if( slotptr(set->page, slot)->type == Librarian )
2942           slot++;
2943
2944         entry = set->latch - mgr->leafsets;
2945         locks[src].reuse = samepage;
2946         locks[src].entry = entry;
2947         locks[src].slot = slot;
2948
2949         //      capture current lsn for master page
2950
2951         locks[src].reqlsn = set->page->lsn;
2952   }
2953
2954   // insert or delete each key
2955   // process any splits or merges
2956   // run through txn list backwards
2957
2958   samepage = source->cnt + 1;
2959
2960   for( src = source->cnt; src; src-- ) {
2961         if( locks[src].reuse )
2962           continue;
2963
2964         //  perform the txn operations
2965         //      from smaller to larger on
2966         //  the same page
2967
2968         for( idx = src; idx < samepage; idx++ )
2969          switch( slotptr(source,idx)->type ) {
2970          case Delete:
2971           if( bt_atomicdelete (mgr, source, locks, idx, thread_no, lsn) )
2972                 return mgr->err;
2973           break;
2974
2975         case Duplicate:
2976         case Unique:
2977           if( bt_atomicinsert (mgr, source, locks, idx, thread_no, lsn) )
2978                 return mgr->err;
2979           break;
2980
2981         default:
2982           bt_atomicpage (mgr, source, locks, idx, set);
2983           break;
2984         }
2985
2986         //      after the same page operations have finished,
2987         //  process master page for splits or deletion.
2988
2989         latch = prev->latch = mgr->leafsets + locks[src].entry;
2990         prev->page = bt_mappage (mgr, prev->latch);
2991         samepage = src;
2992
2993         //  pick-up all splits from master page
2994         //      each one is already pinned & WriteLocked.
2995
2996         while( entry = prev->latch->split ) {
2997           set->latch = mgr->leafsets + entry;
2998           set->page = bt_mappage (mgr, set->latch);
2999
3000           // delete empty master page by undoing its split
3001           //  (this is potentially another empty page)
3002           //  note that there are no pointers to it yet
3003
3004           if( !prev->page->act ) {
3005                 memcpy (set->page->left, prev->page->left, BtId);
3006                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
3007                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3008                 prev->latch->split = set->latch->split;
3009                 bt_freepage (mgr, set, thread_no);
3010                 continue;
3011           }
3012
3013           // remove empty split page from the split chain
3014           // and return it to the free list. No other
3015           // thread has its page number yet.
3016
3017           if( !set->page->act ) {
3018                 memcpy (prev->page->right, set->page->right, BtId);
3019                 prev->latch->split = set->latch->split;
3020
3021                 bt_lockpage (BtLockDelete, set->latch, thread_no, __LINE__);
3022                 bt_freepage (mgr, set, thread_no);
3023                 continue;
3024           }
3025
3026           //  update prev's fence key
3027
3028           ptr = keyptr(prev->page,prev->page->cnt);
3029           bt_putid (value, prev->latch->page_no);
3030
3031           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique, thread_no) )
3032                 return mgr->err;
3033
3034           // splice in the left link into the split page
3035
3036           bt_putid (set->page->left, prev->latch->page_no);
3037           *prev = *set;
3038         }
3039
3040         //  update left pointer in next right page from last split page
3041         //      (if all splits were reversed or none occurred, latch->split == 0)
3042
3043         if( latch->split ) {
3044           //  fix left pointer in master's original (now split)
3045           //  far right sibling or set rightmost page in page zero
3046
3047           if( right_page_no = bt_getid (prev->page->right) ) {
3048                 if( set->latch = bt_pinleaf (mgr, right_page_no, thread_no) )
3049                   set->page = bt_mappage (mgr, set->latch);
3050                 else
3051                   return mgr->err;
3052
3053             bt_lockpage (BtLockLink, set->latch, thread_no, __LINE__);
3054             bt_putid (set->page->left, prev->latch->page_no);
3055             bt_unlockpage (BtLockLink, set->latch, thread_no, __LINE__);
3056                 bt_unpinlatch (set->latch, 1, thread_no, __LINE__);
3057           } else {      // prev is rightmost page
3058             bt_mutexlock (mgr->lock);
3059                 bt_putid (mgr->pagezero->alloc->left, prev->latch->page_no);
3060             bt_releasemutex(mgr->lock);
3061           }
3062
3063           //  switch the original fence key from the
3064           //  master page to the last split page.
3065
3066           ptr = keyptr(prev->page,prev->page->cnt);
3067           bt_putid (value, prev->latch->page_no);
3068
3069           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update, thread_no) )
3070                 return mgr->err;
3071
3072           //  unlock and unpin the split pages
3073
3074           bt_atomicrelease (mgr, latch->split, thread_no);
3075
3076           //  unlock and unpin the master page
3077
3078           latch->split = 0;
3079           bt_unlockpage(BtLockWrite, latch, thread_no, __LINE__);
3080           bt_unpinlatch(latch, 1, thread_no, __LINE__);
3081           continue;
3082         }
3083
3084         //      since there are no splits remaining, we're
3085         //  finished if master page occupied
3086
3087         if( prev->page->act ) {
3088           bt_unlockpage(BtLockWrite, prev->latch, thread_no, __LINE__);
3089           bt_unpinlatch(prev->latch, 1, thread_no, __LINE__);
3090           continue;
3091         }
3092
3093         // any and all splits were reversed, and the
3094         // master page located in prev is empty, delete it
3095
3096         if( bt_deletepage (mgr, prev, thread_no, 0) )
3097                 return mgr->err;
3098   }
3099
3100   free (locks);
3101   return 0;
3102 }
3103
3104 //  pick & promote a page into the larger btree
3105
3106 BTERR bt_promote (BtDb *bt)
3107 {
3108 uint entry, slot, idx;
3109 BtPageSet set[1];
3110 BtSlot *node;
3111 BtKey *ptr;
3112 BtVal *val;
3113
3114   while( 1 ) {
3115 #ifdef unix
3116         entry = __sync_fetch_and_add(&bt->mgr->leafpromote, 1);
3117 #else
3118         entry = _InterlockedIncrement (&bt->mgr->leafpromote) - 1;
3119 #endif
3120         entry %= bt->mgr->leaftotal;
3121
3122         if( !entry )
3123                 continue;
3124
3125         set->latch = bt->mgr->leafsets + entry;
3126
3127         //  skip this entry if it has never been used
3128
3129         if( !set->latch->page_no )
3130           continue;
3131
3132         if( !bt_mutextry(set->latch->modify) )
3133                 continue;
3134
3135         //  skip this entry if it is pinned
3136
3137         if( set->latch->pin & ~CLOCK_bit ) {
3138           bt_releasemutex(set->latch->modify);
3139           continue;
3140         }
3141
3142         set->page = bt_mappage (bt->mgr, set->latch);
3143
3144         // entry has no right sibling
3145
3146         if( !bt_getid (set->page->right) ) {
3147           bt_releasemutex(set->latch->modify);
3148           continue;
3149         }
3150
3151         // entry is being killed or constructed
3152
3153         if( set->page->nopromote || set->page->kill ) {
3154           bt_releasemutex(set->latch->modify);
3155           continue;
3156         }
3157
3158         //  pin the page for our access
3159         //      and leave it locked for the
3160         //      duration of the promotion.
3161
3162         set->latch->pin++;
3163         bt_lockpage (BtLockWrite, set->latch, bt->thread_no, __LINE__);
3164         bt_releasemutex(set->latch->modify);
3165
3166         // transfer slots in our selected page to the main btree
3167 if( !(entry % 100) )
3168 fprintf(stderr, "Promote entry %d page %d, %d keys\n", entry, set->latch->page_no, set->page->act);
3169
3170         if( bt_atomicexec (bt->main, set->page, 0, bt->mgr->pagezero->redopages ? 1 : 0, bt->thread_no) ) {
3171                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
3172                 return bt->main->err;
3173         }
3174
3175         //  now delete the page
3176
3177         if( bt_deletepage (bt->mgr, set, bt->thread_no, 0) )
3178                 fprintf (stderr, "Promote: delete page err = %d, threadno = %d\n", bt->mgr->err, bt->mgr->err_thread);
3179
3180         return bt->mgr->err;
3181   }
3182 }
3183
3184 //      find unique key == given key, or first duplicate key in
3185 //      leaf level and return number of value bytes
3186 //      or (-1) if not found.
3187
3188 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
3189 {
3190 int ret = -1, type;
3191 BtPageSet set[1];
3192 BtSlot *node;
3193 BtKey *ptr;
3194 BtVal *val;
3195 uint slot;
3196
3197  for( type = 0; type < 2; type++ )
3198   if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, bt->thread_no) ) {
3199         node = slotptr(set->page, slot);
3200
3201         //      skip librarian slot place holder
3202
3203         if( node->type == Librarian )
3204           node = slotptr(set->page, ++slot);
3205
3206         ptr = keyptr(set->page, slot);
3207
3208         //      not there if we reach the stopper key
3209         //  or the key doesn't match what's on the page.
3210
3211         if( slot == set->page->cnt )
3212           if( !bt_getid (set->page->right) ) {
3213                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3214                 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3215                 continue;
3216         }
3217
3218         if( keycmp (ptr, key, keylen) ) {
3219                 bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3220                 bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3221                 continue;
3222         }
3223
3224         // key matches, return >= 0 value bytes copied
3225         //      or -1 if not there.
3226
3227         if( node->type == Delete || node->dead ) {
3228                 ret = -1;
3229                 goto findxit;
3230         }
3231
3232         val = valptr (set->page,slot);
3233
3234         if( valmax > val->len )
3235                 valmax = val->len;
3236
3237         memcpy (value, val->value, valmax);
3238         ret = valmax;
3239         goto findxit;
3240   }
3241
3242   ret = -1;
3243
3244 findxit:
3245   if( type < 2 ) {
3246         bt_unlockpage (BtLockRead, set->latch, bt->thread_no, __LINE__);
3247         bt_unpinlatch (set->latch, 0, bt->thread_no, __LINE__);
3248   }
3249   return ret;
3250 }
3251
3252 //      set cursor to highest slot on right-most page
3253
3254 BTERR bt_lastkey (BtDb *bt)
3255 {
3256 uid cache_page_no = bt_getid (bt->mgr->pagezero->alloc->left);
3257 uid main_page_no = bt_getid (bt->main->pagezero->alloc->left);
3258
3259         if( bt->cacheset->latch = bt_pinleaf (bt->mgr, cache_page_no, bt->thread_no) )
3260                 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
3261         else
3262                 return bt->mgr->err;
3263
3264     bt_lockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3265         bt->cacheslot = bt->cacheset->page->cnt;
3266
3267         if( bt->mainset->latch = bt_pinleaf (bt->main, main_page_no, bt->thread_no) )
3268                 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
3269         else
3270                 return bt->main->err;
3271
3272     bt_lockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3273         bt->mainslot = bt->mainset->page->cnt;
3274         bt->phase = 2;
3275         return 0;
3276 }
3277
3278 //      return previous slot on cursor page
3279
3280 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3281 {
3282 uid next, us = set->latch->page_no;
3283
3284   while( 1 ) {
3285         while( --slot )
3286           if( slotptr(set->page, slot)->dead )
3287                 continue;
3288           else
3289                 return slot;
3290
3291         next = bt_getid(set->page->left);
3292
3293         if( !next )
3294                 return 0;
3295
3296         do {
3297           bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3298           bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3299
3300           if( set->latch = bt_pinleaf (mgr, next, thread_no) )
3301             set->page = bt_mappage (mgr, set->latch);
3302           else
3303             return 0;
3304
3305       bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3306           next = bt_getid (set->page->right);
3307
3308         } while( next != us );
3309
3310     slot = set->page->cnt + 1;
3311   }
3312 }
3313
3314 //  advance to previous key
3315
3316 BTERR bt_prevkey (BtDb *bt)
3317 {
3318 int cmp;
3319
3320   // first advance last key(s) one previous slot
3321
3322   while( 1 ) {
3323         switch( bt->phase ) {
3324         case 0:
3325           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3326           break;
3327         case 1:
3328           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3329           break;
3330         case 2:
3331           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3332           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3333         break;
3334         }
3335
3336   // return next key
3337
3338         if( bt->cacheslot ) {
3339           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3340           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3341           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3342         }
3343
3344         if( bt->mainslot ) {
3345           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3346           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3347           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3348         }
3349
3350         if( bt->mainslot && bt->cacheslot )
3351           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3352         else if( bt->cacheslot )
3353           cmp = 1;
3354         else if( bt->mainslot )
3355           cmp = -1;
3356         else
3357           return 0;
3358
3359         //  cache key is larger
3360
3361         if( cmp > 0 ) {
3362           bt->phase = 0;
3363           if( bt->cachenode->type == Delete )
3364                 continue;
3365           return bt->cacheslot;
3366         }
3367
3368         //  main key is larger
3369
3370         if( cmp < 0 ) {
3371           bt->phase = 1;
3372           return bt->mainslot;
3373         }
3374
3375         //      keys are equal
3376
3377         bt->phase = 2;
3378
3379         if( bt->cachenode->type == Delete )
3380                 continue;
3381
3382         return bt->cacheslot;
3383   }
3384 }
3385
3386 //      advance to next slot in cache or main btree
3387 //      return 0 for EOF/error
3388
3389 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot, ushort thread_no)
3390 {
3391 BtPage page;
3392 uid page_no;
3393
3394   while( 1 ) {
3395         while( slot++ < set->page->cnt )
3396           if( slotptr(set->page, slot)->dead )
3397                 continue;
3398           else if( slot < set->page->cnt || bt_getid (set->page->right) )
3399                 return slot;
3400           else
3401                 return 0;
3402
3403         bt_unlockpage(BtLockRead, set->latch, thread_no, __LINE__);
3404         bt_unpinlatch (set->latch, 0, thread_no, __LINE__);
3405
3406         if( page_no = bt_getid(set->page->right) )
3407           if( set->latch = bt_pinleaf (mgr, page_no, thread_no) )
3408                 set->page = bt_mappage (mgr, set->latch);
3409           else
3410                 return 0;
3411         else
3412           return 0; // EOF
3413
3414         // obtain access lock using lock chaining with Access mode
3415
3416         bt_lockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3417         bt_lockpage(BtLockRead, set->latch, thread_no, __LINE__);
3418         bt_unlockpage(BtLockAccess, set->latch, thread_no, __LINE__);
3419         slot = 0;
3420   }
3421 }
3422
3423 //  advance to next key
3424
3425 BTERR bt_nextkey (BtDb *bt)
3426 {
3427 int cmp;
3428
3429   // first advance last key(s) one next slot
3430
3431   while( 1 ) {
3432         switch( bt->phase ) {
3433         case 0:
3434           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3435           break;
3436         case 1:
3437           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3438           break;
3439         case 2:
3440           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot, bt->thread_no);
3441           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot, bt->thread_no);
3442         break;
3443         }
3444
3445   // return next key
3446
3447         if( bt->cacheslot ) {
3448           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3449           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3450           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3451         }
3452
3453         if( bt->mainslot ) {
3454           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3455           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3456           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3457         }
3458
3459         if( bt->mainslot && bt->cacheslot )
3460           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3461         else if( bt->mainslot )
3462           cmp = 1;
3463         else if( bt->cacheslot )
3464           cmp = -1;
3465         else
3466           return 0;
3467
3468         //  main key is larger
3469         //      return smaller key
3470
3471         if( cmp < 0 ) {
3472           bt->phase = 0;
3473           if( bt->cachenode->type == Delete )
3474                 continue;
3475           return bt->cacheslot;
3476         }
3477
3478         //  cache key is larger
3479
3480         if( cmp > 0 ) {
3481           bt->phase = 1;
3482           return bt->mainslot;
3483         }
3484
3485         //      keys are equal
3486
3487         bt->phase = 2;
3488
3489         if( bt->cachenode->type == Delete )
3490                 continue;
3491
3492         return bt->cacheslot;
3493   }
3494 }
3495
3496 //  start sweep of keys
3497
3498 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3499 {
3500 BtPageSet set[1];
3501 uint slot;
3502
3503         // cache btree page
3504
3505         if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, bt->thread_no) )
3506                 bt->cacheslot = slot - 1;
3507         else
3508           return bt->mgr->err;
3509
3510         // main btree page
3511
3512         if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, bt->thread_no) )
3513                 bt->mainslot = slot - 1;
3514         else
3515           return bt->mgr->err;
3516
3517         bt->phase = 2;
3518         return 0;
3519 }
3520
3521 #ifdef STANDALONE
3522
3523 #ifndef unix
3524 double getCpuTime(int type)
3525 {
3526 FILETIME crtime[1];
3527 FILETIME xittime[1];
3528 FILETIME systime[1];
3529 FILETIME usrtime[1];
3530 SYSTEMTIME timeconv[1];
3531 double ans = 0;
3532
3533         memset (timeconv, 0, sizeof(SYSTEMTIME));
3534
3535         switch( type ) {
3536         case 0:
3537                 GetSystemTimeAsFileTime (xittime);
3538                 FileTimeToSystemTime (xittime, timeconv);
3539                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3540                 break;
3541         case 1:
3542                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3543                 FileTimeToSystemTime (usrtime, timeconv);
3544                 break;
3545         case 2:
3546                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3547                 FileTimeToSystemTime (systime, timeconv);
3548                 break;
3549         }
3550
3551         ans += (double)timeconv->wHour * 3600;
3552         ans += (double)timeconv->wMinute * 60;
3553         ans += (double)timeconv->wSecond;
3554         ans += (double)timeconv->wMilliseconds / 1000;
3555         return ans;
3556 }
3557 #else
3558 #include <time.h>
3559 #include <sys/resource.h>
3560
3561 double getCpuTime(int type)
3562 {
3563 struct rusage used[1];
3564 struct timeval tv[1];
3565
3566         switch( type ) {
3567         case 0:
3568                 gettimeofday(tv, NULL);
3569                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3570
3571         case 1:
3572                 getrusage(RUSAGE_SELF, used);
3573                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3574
3575         case 2:
3576                 getrusage(RUSAGE_SELF, used);
3577                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3578         }
3579
3580         return 0;
3581 }
3582 #endif
3583
3584 void bt_poolaudit (BtMgr *mgr, char *type)
3585 {
3586 BtLatchSet *latch, test[1];
3587 uint entry;
3588
3589         memset (test, 0, sizeof(test));
3590
3591         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3592                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3593
3594         if( memcmp (test, mgr->leafsets, sizeof(test)) )
3595                 fprintf(stderr, "%s leafset zero overwritten\n", type);
3596
3597         for( entry = 0; ++entry < mgr->latchtotal; ) {
3598                 latch = mgr->latchsets + entry;
3599
3600                 if( latch->leaf )
3601                         fprintf(stderr, "%s latchset %d is a leaf on page %d\n", type, entry, latch->page_no);
3602
3603                 if( *latch->modify->value )
3604                         fprintf(stderr, "%s latchset %d modifylocked for page %d\n", type, entry, latch->page_no);
3605
3606                 if( latch->pin & ~CLOCK_bit )
3607                         fprintf(stderr, "%s latchset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3608         }
3609
3610         for( entry = 0; ++entry < mgr->leaftotal; ) {
3611                 latch = mgr->leafsets + entry;
3612
3613                 if( !latch->leaf )
3614                         fprintf(stderr, "%s leafset %d is not a leaf on page %d\n", type, entry, latch->page_no);
3615
3616                 if( *latch->modify->value )
3617                         fprintf(stderr, "%s leafset %d modifylocked for page %d\n", type, entry, latch->page_no);
3618
3619                 if( latch->pin & ~CLOCK_bit )
3620                         fprintf(stderr, "%s leafset %d pinned %d times for page %d\n", type, entry, latch->pin & ~CLOCK_bit, latch->page_no);
3621         }
3622 }
3623
3624 typedef struct {
3625         char idx;
3626         char *type;
3627         char *infile;
3628         BtMgr *main;
3629         BtMgr *mgr;
3630         int num;
3631 } ThreadArg;
3632
3633 //  standalone program to index file of keys
3634 //  then list them onto std-out
3635
3636 #ifdef unix
3637 void *index_file (void *arg)
3638 #else
3639 uint __stdcall index_file (void *arg)
3640 #endif
3641 {
3642 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3643 uid next, page_no = LEAF_page;  // start on first page of leaves
3644 int ch, len = 0, slot, type = 0;
3645 unsigned char key[BT_maxkey];
3646 unsigned char buff[65536];
3647 uint nxt = sizeof(buff);
3648 ThreadArg *args = arg;
3649 BtPageSet set[1];
3650 BtPage page;
3651 int vallen;
3652 BtKey *ptr;
3653 BtVal *val;
3654 BtDb *bt;
3655 FILE *in;
3656
3657         bt = bt_open (args->mgr, args->main);
3658         page = (BtPage)buff;
3659
3660         if( args->idx < strlen (args->type) )
3661                 ch = args->type[args->idx];
3662         else
3663                 ch = args->type[strlen(args->type) - 1];
3664
3665         switch(ch | 0x20)
3666         {
3667         case 'd':
3668                 type = Delete;
3669
3670         case 'p':
3671                 if( !type )
3672                         type = Unique;
3673
3674                 if( args->num )
3675                  if( type == Delete )
3676                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3677                  else
3678                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3679                 else
3680                  if( type == Delete )
3681                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3682                  else
3683                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3684
3685                 if( in = fopen (args->infile, "rb") )
3686                   while( ch = getc(in), ch != EOF )
3687                         if( ch == '\n' )
3688                         {
3689                           line++;
3690
3691                           if( !args->num ) {
3692                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique, bt->thread_no) )
3693                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3694                             len = 0;
3695                                 continue;
3696                           }
3697
3698                           nxt -= len - 10;
3699                           memcpy (buff + nxt, key + 10, len - 10);
3700                           nxt -= 1;
3701                           buff[nxt] = len - 10;
3702                           nxt -= 10;
3703                           memcpy (buff + nxt, key, 10);
3704                           nxt -= 1;
3705                           buff[nxt] = 10;
3706                           slotptr(page,++cnt)->off  = nxt;
3707                           slotptr(page,cnt)->type = type;
3708                           len = 0;
3709
3710                           if( cnt < args->num )
3711                                 continue;
3712
3713                           page->cnt = cnt;
3714                           page->act = cnt;
3715                           page->min = nxt;
3716
3717                           if( bt_atomictxn (bt, page) )
3718                                 fprintf(stderr, "Error %d Line: %d by %d source: %d\n", bt->mgr->err, bt->mgr->line, bt->mgr->err_thread, line), exit(0);
3719                           nxt = sizeof(buff);
3720                           cnt = 0;
3721
3722                         }
3723                         else if( len < BT_maxkey )
3724                                 key[len++] = ch;
3725                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3726                 break;
3727
3728         case 'w':
3729                 fprintf(stderr, "started indexing for %s\n", args->infile);
3730                 if( in = fopen (args->infile, "r") )
3731                   while( ch = getc(in), ch != EOF )
3732                         if( ch == '\n' )
3733                         {
3734                           line++;
3735
3736                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique, bt->thread_no) )
3737                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3738                           len = 0;
3739                         }
3740                         else if( len < BT_maxkey )
3741                                 key[len++] = ch;
3742                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3743                 break;
3744
3745         case 'f':
3746                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3747                 if( in = fopen (args->infile, "rb") )
3748                   while( ch = getc(in), ch != EOF )
3749                         if( ch == '\n' )
3750                         {
3751                           line++;
3752                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3753                                 found++;
3754                           else if( bt->mgr->err )
3755                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3756                           len = 0;
3757                         }
3758                         else if( len < BT_maxkey )
3759                                 key[len++] = ch;
3760                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3761                 break;
3762
3763         case 's':
3764                 fprintf(stderr, "started forward scan\n");
3765                 if( bt_startkey (bt, NULL, 0) )
3766                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3767
3768                 while( bt_nextkey (bt) ) {
3769                   if( bt->phase == 1 ) {
3770                         len = bt->mainkey->len;
3771
3772                         if( bt->mainnode->type == Duplicate )
3773                                 len -= BtId;
3774
3775                         fwrite (bt->mainkey->key, len, 1, stdout);
3776                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3777                   } else {
3778                         len = bt->cachekey->len;
3779
3780                         if( bt->cachenode->type == Duplicate )
3781                                 len -= BtId;
3782
3783                         fwrite (bt->cachekey->key, len, 1, stdout);
3784                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3785                   }
3786
3787                   fputc ('\n', stdout);
3788                   cnt++;
3789             }
3790
3791                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3792                 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3793
3794                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3795                 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3796
3797                 fprintf(stderr, " Total keys read %d\n", cnt);
3798                 break;
3799
3800         case 'r':
3801                 fprintf(stderr, "started reverse scan\n");
3802                 if( bt_lastkey (bt) )
3803                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3804
3805                 while( bt_prevkey (bt) ) {
3806                   if( bt->phase == 1 ) {
3807                         len = bt->mainkey->len;
3808
3809                         if( bt->mainnode->type == Duplicate )
3810                                 len -= BtId;
3811
3812                         fwrite (bt->mainkey->key, len, 1, stdout);
3813                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3814                   } else {
3815                         len = bt->cachekey->len;
3816
3817                         if( bt->cachenode->type == Duplicate )
3818                                 len -= BtId;
3819
3820                         fwrite (bt->cachekey->key, len, 1, stdout);
3821                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3822                   }
3823
3824                   fputc ('\n', stdout);
3825                   cnt++;
3826             }
3827
3828                 bt_unlockpage(BtLockRead, bt->cacheset->latch, bt->thread_no, __LINE__);
3829                 bt_unpinlatch (bt->cacheset->latch, 0, bt->thread_no, __LINE__);
3830
3831                 bt_unlockpage(BtLockRead, bt->mainset->latch, bt->thread_no, __LINE__);
3832                 bt_unpinlatch (bt->mainset->latch, 0, bt->thread_no, __LINE__);
3833
3834                 fprintf(stderr, " Total keys read %d\n", cnt);
3835                 break;
3836
3837         case 'c':
3838                 fprintf(stderr, "started counting LSM cache btree\n");
3839                 next = bt->mgr->redopage + bt->mgr->pagezero->redopages;
3840                 page_no = LEAF_page;
3841
3842 #ifdef unix
3843                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3844 #endif
3845                 while( page_no < bt_getid(bt->mgr->pagezero->alloc->right) ) {
3846                   pread(bt->mgr->idx, page, sizeof(*page), page_no << bt->mgr->page_bits);
3847                   if( !page->lvl && !page->free )
3848                     cnt += page->act;
3849                   if( next )
3850                         page_no = next;
3851                   else
3852                         page_no += page->lvl ? 1 : (1 << bt->mgr->leaf_xtra);
3853                   next = 0;
3854                 }
3855                 
3856                 cachecnt = --cnt;       // remove stopper key
3857                 cnt = 0;
3858
3859                 fprintf(stderr, "started counting LSM main btree\n");
3860                 next = bt->main->redopage + bt->main->pagezero->redopages;
3861                 page_no = LEAF_page;
3862
3863 #ifdef unix
3864                 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3865 #endif
3866                 while( page_no < bt_getid(bt->main->pagezero->alloc->right) ) {
3867                   pread(bt->main->idx, page, sizeof(*page), page_no << bt->main->page_bits);
3868                   if( !page->lvl )
3869                     cnt += page->act;
3870                   if( next )
3871                         page_no = next;
3872                   else
3873                         page_no += page->lvl ? 1 : (1 << bt->main->leaf_xtra);
3874                   next = 0;
3875                 }
3876                 
3877                 cnt--;  // remove stopper key
3878
3879                 fprintf(stderr, " cache keys counted %d\n", cachecnt);
3880                 fprintf(stderr, " main keys counted %d\n", cnt);
3881                 fprintf(stderr, " Total keys counted %d\n", cnt + cachecnt);
3882                 break;
3883         }
3884
3885         bt_close (bt);
3886 #ifdef unix
3887         return NULL;
3888 #else
3889         return 0;
3890 #endif
3891 }
3892
3893 typedef struct timeval timer;
3894
3895 int main (int argc, char **argv)
3896 {
3897 int idx, cnt, len, slot, err;
3898 double start, stop;
3899 #ifdef unix
3900 pthread_t *threads;
3901 #else
3902 HANDLE *threads;
3903 #endif
3904 ThreadArg *args;
3905 uint mainleafpool = 0;
3906 uint mainleafxtra = 0;
3907 uint redopages = 0;
3908 uint poolsize = 0;
3909 uint leafpool = 0;
3910 uint leafxtra = 0;
3911 uint mainpool = 0;
3912 uint mainbits = 0;
3913 int bits = 16;
3914 float elapsed;
3915 int num = 0;
3916 char key[1];
3917 BtMgr *main;
3918 BtMgr *mgr;
3919 BtKey *ptr;
3920
3921         if( argc < 3 ) {
3922                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize leafpool txnsize redopages mainbits mainleafbits mainpool mainleafpool src_file1 src_file2 ... ]\n", argv[0]);
3923                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3924                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3925                 fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort, with a one character command for each input src_file. Commands can also be given with no input file\n");
3926                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3927                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3928                 fprintf (stderr, "  poolsize is the number of pages in buffer pool for the cache btree\n");
3929                 fprintf (stderr, "  leafpool is the number of leaf pages in leaf pool for the cache btree\n");
3930                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3931                 fprintf (stderr, "  redopages = n to implement recovery buffer with n pages, or zero for no recovery buffer\n");
3932                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3933                 fprintf (stderr, "  mainpool is the number of main pages in the main buffer pool\n");
3934                 fprintf (stderr, "  mainleaf is the number of main pages in the main leaf pool\n");
3935                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3936                 exit(0);
3937         }
3938
3939         start = getCpuTime(0);
3940
3941         if( argc > 4 )
3942                 bits = atoi(argv[4]);
3943
3944         if( argc > 5 )
3945                 leafxtra = atoi(argv[5]);
3946
3947         if( argc > 6 )
3948                 poolsize = atoi(argv[6]);
3949
3950         if( !poolsize )
3951                 fprintf (stderr, "no page pool\n"), exit(1);
3952
3953         if( argc > 7 )
3954                 leafpool = atoi(argv[7]);
3955
3956         if( argc > 8 )
3957                 num = atoi(argv[8]);
3958
3959         if( argc > 9 )
3960                 redopages = atoi(argv[9]);
3961
3962         if( redopages > 65535 )
3963                 fprintf (stderr, "Recovery buffer too large\n"), exit(1);
3964
3965         if( argc > 10 )
3966                 mainbits = atoi(argv[10]);
3967
3968         if( argc > 11 )
3969                 mainleafxtra = atoi(argv[11]);
3970
3971         if( argc > 12 )
3972                 mainpool = atoi(argv[12]);
3973
3974         if( argc > 13 )
3975                 mainleafpool = atoi(argv[13]);
3976
3977         cnt = argc - 14;
3978 #ifdef unix
3979         threads = malloc (cnt * sizeof(pthread_t));
3980 #else
3981         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3982 #endif
3983         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3984
3985         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize, leafpool, redopages);
3986
3987         if( !mgr ) {
3988                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3989                 exit (1);
3990         } else
3991                 mgr->type = 0;
3992
3993         if( mainbits ) {
3994                 main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool, mainleafpool, 0);
3995
3996                 if( !main ) {
3997                         fprintf(stderr, "Index Open Error %s\n", argv[2]);
3998                         exit (1);
3999                 } else
4000                         main->type = 1;
4001         } else
4002                 main = NULL;
4003
4004         //      fire off threads
4005
4006         if( cnt )
4007           for( idx = 0; idx < cnt; idx++ ) {
4008                 args[idx].infile = argv[idx + 14];
4009                 args[idx].type = argv[3];
4010                 args[idx].main = main;
4011                 args[idx].mgr = mgr;
4012                 args[idx].num = num;
4013                 args[idx].idx = idx;
4014 #ifdef unix
4015                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
4016                         fprintf(stderr, "Error creating thread %d\n", err);
4017 #else
4018                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
4019 #endif
4020           }
4021         else {
4022                 args[0].infile = argv[idx + 10];
4023                 args[0].type = argv[3];
4024                 args[0].main = main;
4025                 args[0].mgr = mgr;
4026                 args[0].num = num;
4027                 args[0].idx = 0;
4028                 index_file (args);
4029         }
4030
4031         //      wait for termination
4032
4033 #ifdef unix
4034         for( idx = 0; idx < cnt; idx++ )
4035                 pthread_join (threads[idx], NULL);
4036 #else
4037         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
4038
4039         for( idx = 0; idx < cnt; idx++ )
4040                 CloseHandle(threads[idx]);
4041 #endif
4042         bt_poolaudit(mgr, "cache");
4043
4044         if( main )
4045                 bt_poolaudit(main, "main");
4046
4047         fprintf(stderr, "cache %lld leaves %lld upper %d reads %d writes %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->reads, mgr->writes, mgr->found);
4048         if( main )
4049                 fprintf(stderr, "main %lld leaves %lld upper %d reads %d writes %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->reads, main->writes, main->found);
4050
4051         bt_mgrclose (mgr);
4052
4053         if( main )
4054                 bt_mgrclose (main);
4055
4056         elapsed = getCpuTime(0) - start;
4057         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4058         elapsed = getCpuTime(1);
4059         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4060         elapsed = getCpuTime(2);
4061         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
4062 }
4063
4064 BtKey *bt_key (BtPage page, uint slot)
4065 {
4066 return keyptr(page,slot);
4067 }
4068
4069 BtSlot *bt_slot (BtPage page, uint slot)
4070 {
4071 return slotptr(page,slot);
4072 }
4073 #endif  //STANDALONE