]> pd.if.org Git - btree/blob - threadskv10h.c
Add multi/thread multi-process threadskv10h.c
[btree] / threadskv10h.c
1 // btree version threadskv10h futex version
2 //      with reworked bt_deletekey code,
3 //      phase-fair re-entrant reader writer lock,
4 //      librarian page split code,
5 //      duplicate key management
6 //      bi-directional cursors
7 //      ACID batched key-value updates
8 //      LSM B-trees for write optimization
9 //      larger sized leaf pages than non-leaf
10 //      and LSM B-tree find & count operations
11
12 // 15 DEC 2014
13
14 // author: karl malbrain, malbrain@cal.berkeley.edu
15
16 /*
17 This work, including the source code, documentation
18 and related data, is placed into the public domain.
19
20 The orginal author is Karl Malbrain.
21
22 THIS SOFTWARE IS PROVIDED AS-IS WITHOUT WARRANTY
23 OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF
24 MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE,
25 ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE
26 RESULTING FROM THE USE, MODIFICATION, OR
27 REDISTRIBUTION OF THIS SOFTWARE.
28 */
29
30 // Please see the project home page for documentation
31 // code.google.com/p/high-concurrency-btree
32
33 #define _FILE_OFFSET_BITS 64
34 #define _LARGEFILE64_SOURCE
35
36 #ifdef linux
37 #define _GNU_SOURCE
38 #include <xmmintrin.h>
39 #include <linux/futex.h>
40 #include <sys/syscall.h>
41 #endif
42
43 #ifdef unix
44 #include <unistd.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <fcntl.h>
48 #include <sys/time.h>
49 #include <sys/mman.h>
50 #include <errno.h>
51 #include <pthread.h>
52 #include <limits.h>
53 #else
54 #define WIN32_LEAN_AND_MEAN
55 #include <windows.h>
56 #include <stdio.h>
57 #include <stdlib.h>
58 #include <time.h>
59 #include <fcntl.h>
60 #include <process.h>
61 #include <intrin.h>
62 #endif
63
64 #include <memory.h>
65 #include <string.h>
66 #include <stddef.h>
67
68 typedef unsigned long long      uid;
69 typedef unsigned long long      logseqno;
70
71 #ifndef unix
72 typedef unsigned long long      off64_t;
73 typedef unsigned short          ushort;
74 typedef unsigned int            uint;
75 #endif
76
77 #define BT_ro 0x6f72    // ro
78 #define BT_rw 0x7772    // rw
79
80 #define BT_maxbits              26                                      // maximum page size in bits
81 #define BT_minbits              9                                       // minimum page size in bits
82 #define BT_minpage              (1 << BT_minbits)       // minimum page size
83 #define BT_maxpage              (1 << BT_maxbits)       // maximum page size
84
85 //  BTree page number constants
86 #define ALLOC_page              0       // allocation page
87 #define ROOT_page               1       // root of the btree
88 #define LATCH_page              2       // first page of latches
89
90 #define SEG_bits                16      // number of leaf pages in a segment in bits
91 #define MIN_seg                 32      // initial number of mapping segments
92
93 //      Number of levels to create in a new BTree
94 #define MIN_lvl                 2
95
96 /*
97 There are six lock types for each node in four independent sets: 
98 1. (set 1) AccessIntent: Sharable. Going to Read the node. Incompatible with NodeDelete. 
99 2. (set 1) NodeDelete: Exclusive. About to release the node. Incompatible with AccessIntent. 
100 3. (set 2) ReadLock: Sharable. Read the node. Incompatible with WriteLock. 
101 4. (set 2) WriteLock: Exclusive. Modify the node. Incompatible with ReadLock and other WriteLocks. 
102 5. (set 3) ParentModification: Exclusive. Change the node's parent keys. Incompatible with another ParentModification. 
103 6. (set 4) LinkModification: Exclusive. Update of a node's left link is underway. Incompatible with another LinkModification. 
104 */
105
106 typedef enum{
107         BtLockAccess = 1,
108         BtLockDelete = 2,
109         BtLockRead   = 4,
110         BtLockWrite  = 8,
111         BtLockParent = 16,
112         BtLockLink   = 32
113 } BtLock;
114
115 typedef struct {
116   union {
117         struct {
118           volatile unsigned char xcl[1];
119           volatile unsigned char filler;
120           volatile ushort waiters[1];
121         } bits[1];
122         uint value[1];
123   };
124 } MutexLatch;
125
126 //      definition for reader/writer reentrant lock implementation
127
128 typedef struct {
129   MutexLatch xcl[1];
130   MutexLatch wrt[1];
131   ushort readers;       // number of readers holding lock
132 #ifdef DEBUG
133   ushort line;          // owner source line number
134 #endif
135   ushort dup;           // re-entrant lock count
136   pid_t tid;            // owner pid
137 } RWLock;
138
139 //  hash table entries
140
141 typedef struct {
142         MutexLatch latch[1];
143         uint entry;             // Latch table entry at head of chain
144 } BtHashEntry;
145
146 //      latch manager table structure
147
148 typedef struct {
149         uid page_no;                    // latch set page number
150         MutexLatch modify[1];   // modify entry lite latch
151         RWLock readwr[1];               // read/write page lock
152         RWLock access[1];               // Access Intent/Page delete
153         RWLock parent[1];               // Posting of fence key in parent
154         RWLock link[1];                 // left link update in progress
155         uint split;                             // right split page atomic insert
156         uint next;                              // next entry in hash table chain
157         uint prev;                              // prev entry in hash table chain
158         uint pin;                               // number of accessing threads
159 } BtLatchSet;
160
161 //      Define the length of the page record numbers
162
163 #define BtId 6
164
165 //      Page key slot definition.
166
167 //      Keys are marked dead, but remain on the page until
168 //      it cleanup is called. The fence key (highest key) for
169 //      a leaf page is always present, even after cleanup.
170
171 //      Slot types
172
173 //      In addition to the Unique keys that occupy slots
174 //      there are Librarian and Duplicate key
175 //      slots occupying the key slot array.
176
177 //      The Librarian slots are dead keys that
178 //      serve as filler, available to add new Unique
179 //      or Dup slots that are inserted into the B-tree.
180
181 //      The Duplicate slots have had their key bytes extended
182 //      by 6 bytes to contain a binary duplicate key uniqueifier.
183
184 typedef enum {
185         Unique,
186         Update,
187         Librarian,
188         Duplicate,
189         Delete
190 } BtSlotType;
191
192 typedef struct {
193         uint off:BT_maxbits;    // page offset for key start
194         uint type:3;                    // type of slot
195         uint dead:1;                    // set for deleted slot
196 } BtSlot;
197
198 //      The key structure occupies space at the upper end of
199 //      each page.  It's a length byte followed by the key
200 //      bytes.
201
202 typedef struct {
203         unsigned char len;              // this can be changed to a ushort or uint
204         unsigned char key[0];
205 } BtKey;
206
207 //      the value structure also occupies space at the upper
208 //      end of the page. Each key is immediately followed by a value.
209
210 typedef struct {
211         unsigned char len;              // this can be changed to a ushort or uint
212         unsigned char value[0];
213 } BtVal;
214
215 #define BT_maxkey       255             // maximum number of bytes in a key
216 #define BT_keyarray (BT_maxkey + sizeof(BtKey))
217
218 //      The first part of an index page.
219 //      It is immediately followed
220 //      by the BtSlot array of keys.
221
222 typedef struct BtPage_ {
223         uint cnt;                                       // count of keys in page
224         uint act;                                       // count of active keys
225         uint min;                                       // next key/value offset
226         uint fence;                                     // page fence key offset
227         uint garbage;                           // page garbage in bytes
228         unsigned char lvl;                      // level of page, zero = leaf
229         unsigned char free;                     // page is on the free chain
230         unsigned char kill;                     // page is being deleted
231         unsigned char nopromote;        // page is being constructed
232         uid right, left;                        // page numbers to right and left
233 } *BtPage;
234
235 //  The loadpage interface object
236
237 typedef struct {
238         BtPage page;            // current page pointer
239         BtLatchSet *latch;      // current page latch set
240 } BtPageSet;
241
242 //      structure for latch manager on shared ALLOC_page
243
244 typedef struct {
245         uid allocpage;                                  // page number of first available page
246         uid freechain;                                  // head of free page_nos chain
247         uid leafchain;                                  // head of leaf page_nos chain
248         uid leaf_page;                                  // page number of leftmost leaf
249         uid rightleaf;                                  // page number of rightmost leaf
250         uid leafpromote;                                // next leaf page to try promotion
251         unsigned long long leafpages;   // number of active leaf pages
252         unsigned long long upperpages;  // number of active upper pages
253         unsigned char leaf_xtra;                // leaf page size in xtra bits
254         unsigned char page_bits;                // base page size in bits
255         uint nlatchpage;                                // size of buffer pool & latchsets
256         uint latchtotal;                                // number of page latch entries
257         uint latchvictim;                               // next latch entry to test for pin
258         uint latchhash;                                 // number of latch hash table slots
259         MutexLatch lock[1];                             // allocation area lite latch
260         MutexLatch promote[1];                  // promotion lite latch
261 } BtPageZero;
262
263 //      The object structure for Btree access
264
265 typedef struct {
266         uint page_size;                         // base page size       
267         uint page_bits;                         // base page size in bits       
268         uint leaf_xtra;                         // leaf xtra bits       
269 #ifdef unix
270         int idx;
271 #else
272         HANDLE idx;
273 #endif
274         BtPageZero *pagezero;           // mapped allocation page
275         BtHashEntry *hashtable;         // the buffer pool hash table entries
276         BtLatchSet *latchsets;          // mapped latch set from buffer pool
277         uint maxleaves;                         // leaf page count to begin promote
278         int err;                                        // last error
279         int line;                                       // last error line no
280         int found;                                      // number of keys found by delete
281         int type;                                       // type of LSM tree 0=cache, 1=main
282         uint maxseg;                            // max number of memory mapped segments
283         uint segments;                          // number of memory mapped segments in use
284         MutexLatch maps[1];                     // segment table mutex
285         unsigned char **pages;          // memory mapped segments of b-tree
286 } BtMgr;
287
288 typedef struct {
289         BtMgr *mgr;                                     // buffer manager for entire process
290         BtMgr *main;                            // buffer manager for main btree
291         pid_t tid;                                      // thread-id of thread
292         BtPageSet cacheset[1];          // cached page frame for cache btree
293         BtPageSet mainset[1];           // cached page frame for main btree
294         uint cacheslot;                         // slot number in cacheset
295         uint mainslot;                          // slot number in mainset
296         ushort phase;                           // 1 = main btree 0 = cache btree 2 = both
297         BtSlot *cachenode;
298         BtSlot *mainnode;
299         BtKey *cachekey;
300         BtKey *mainkey;
301         BtVal *cacheval;
302         BtVal *mainval;
303 } BtDb;
304
305 typedef struct {
306         uint entry:31;          // latch table entry number
307         uint reuse:1;           // reused previous page
308         uint slot;                      // slot on page
309         uint src;                       // source slot
310 } AtomicTxn;
311
312 //      Catastrophic errors
313
314 typedef enum {
315         BTERR_ok = 0,
316         BTERR_struct,
317         BTERR_ovflw,
318         BTERR_lock,
319         BTERR_map,
320         BTERR_read,
321         BTERR_wrt,
322         BTERR_atomic
323 } BTERR;
324
325 // B-Tree functions
326
327 extern void bt_close (BtDb *bt);
328 extern BtDb *bt_open (BtMgr *mgr, BtMgr *main);
329 extern BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf);
330 extern void bt_lockpage(BtLock mode, BtLatchSet *latch, pid_t tid, uint line);
331 extern void bt_unlockpage(BtLock mode, BtLatchSet *latch, uint line);
332 extern BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint len, uint lvl, void *value, uint vallen, BtSlotType type);
333 extern BTERR  bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl);
334
335 extern int bt_findkey (BtDb *db, unsigned char *key, uint keylen, unsigned char *value, uint valmax);
336
337 extern BTERR bt_startkey (BtDb *db, unsigned char *key, uint len);
338 extern BTERR bt_nextkey (BtDb *bt);
339
340 extern uint bt_lastkey (BtDb *bt);
341 extern uint bt_prevkey (BtDb *bt);
342
343 //      manager functions
344 extern BtMgr *bt_mgr (char *name, uint bits, uint leaf_xtra, uint poolsize);
345 extern void bt_mgrclose (BtMgr *mgr);
346
347 //      atomic transaction functions
348 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, pid_t tid);
349 BTERR bt_promote (BtDb *bt);
350
351 //  The page is allocated from low and hi ends.
352 //  The key slots are allocated from the bottom,
353 //      while the text and value of the key
354 //  are allocated from the top.  When the two
355 //  areas meet, the page is split into two.
356
357 //  A key consists of a length byte, two bytes of
358 //  index number (0 - 65535), and up to 253 bytes
359 //  of key value.
360
361 //  Associated with each key is a value byte string
362 //      containing any value desired.
363
364 //  The b-tree root is always located at page 1.
365 //      The first leaf page of level zero is always
366 //      located on page 2.
367
368 //      The b-tree pages are linked with next
369 //      pointers to facilitate enumerators,
370 //      and provide for concurrency.
371
372 //      When to root page fills, it is split in two and
373 //      the tree height is raised by a new root at page
374 //      one with two keys.
375
376 //      Deleted keys are marked with a dead bit until
377 //      page cleanup. The fence key for a leaf node is
378 //      always present
379
380 //  To achieve maximum concurrency one page is locked at a time
381 //  as the tree is traversed to find leaf key in question. The right
382 //  page numbers are used in cases where the page is being split,
383 //      or consolidated.
384
385 //  Page 0 is dedicated to lock for new page extensions,
386 //      and chains empty pages together for reuse. It also
387 //      contains the latch manager hash table.
388
389 //      The ParentModification lock on a node is obtained to serialize posting
390 //      or changing the fence key for a node.
391
392 //      Empty pages are chained together through the ALLOC page and reused.
393
394 //      Access macros to address slot and key values from the page
395 //      Page slots use 1 based indexing.
396
397 #define slotptr(page, slot) (((BtSlot *)(page+1)) + ((slot)-1))
398 #define keyptr(page, slot) ((BtKey*)((unsigned char*)(page) + slotptr(page, slot)->off))
399 #define valptr(page, slot) ((BtVal*)(keyptr(page,slot)->key + keyptr(page,slot)->len))
400 #define fenceptr(page) ((BtKey*)((unsigned char*)(page) + page->fence))
401
402 void bt_putid(unsigned char *dest, uid id)
403 {
404 int i = BtId;
405
406         while( i-- )
407                 dest[i] = (unsigned char)id, id >>= 8;
408 }
409
410 uid bt_getid(unsigned char *src)
411 {
412 uid id = 0;
413 int i;
414
415         for( i = 0; i < BtId; i++ )
416                 id <<= 8, id |= *src++; 
417
418         return id;
419 }
420
421 //      lite weight spin lock Latch Manager
422
423 pid_t sys_gettid ()
424 {
425         return syscall(SYS_gettid);
426 }
427
428 int sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3)
429 {
430         return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
431 }
432
433 void bt_mutexlock(MutexLatch *latch)
434 {
435 uint idx, waited = 0;
436 MutexLatch prev[1];
437
438  while( 1 ) {
439   for( idx = 0; idx < 100; idx++ ) {
440         *prev->value = __sync_fetch_and_or (latch->value, 1);
441         if( !*prev->bits->xcl ) {
442           if( waited )
443                 __sync_fetch_and_sub (latch->bits->waiters, 1);
444           return;
445         }
446   }
447
448   if( !waited ) {
449         __sync_fetch_and_add (latch->bits->waiters, 1);
450         *prev->bits->waiters += 1;
451         waited++;
452   }
453
454   sys_futex (latch->value, FUTEX_WAIT_PRIVATE, *prev->value, NULL, NULL, 0);
455  }
456 }
457
458 int bt_mutextry(MutexLatch *latch)
459 {
460         return !__sync_lock_test_and_set (latch->bits->xcl, 1);
461 }
462
463 void bt_releasemutex(MutexLatch *latch)
464 {
465 MutexLatch prev[1];
466
467         *prev->value = __sync_fetch_and_and (latch->value, 0xffff0000);
468
469         if( *prev->bits->waiters )
470                 sys_futex( latch->value, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0 );
471 }
472
473 //      reader/writer lock implementation
474
475 void WriteLock (RWLock *lock, pid_t tid, uint line)
476 {
477         if( tid && lock->tid == tid ) {
478                 lock->dup++;
479                 return;
480         }
481         bt_mutexlock (lock->xcl);
482         bt_mutexlock (lock->wrt);
483         bt_releasemutex (lock->xcl);
484         lock->tid = tid;
485 #ifdef DEBUG
486         lock->line = line;
487 #endif
488 }
489
490 void WriteRelease (RWLock *lock)
491 {
492         if( lock->dup ) {
493                 lock->dup--;
494                 return;
495         }
496         lock->tid = 0;
497         bt_releasemutex (lock->wrt);
498 }
499
500 void ReadLock (RWLock *lock)
501 {
502         bt_mutexlock (lock->xcl);
503
504         if( !__sync_fetch_and_add (&lock->readers, 1) )
505                 bt_mutexlock (lock->wrt);
506
507         bt_releasemutex (lock->xcl);
508 }
509
510 void ReadRelease (RWLock *lock)
511 {
512         if( __sync_fetch_and_sub (&lock->readers, 1) == 1 )
513                 bt_releasemutex (lock->wrt);
514 }
515
516 //      read page into buffer pool from permanent location in Btree file
517
518 BTERR bt_readpage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
519 {
520 uint page_size = mgr->page_size;
521
522   if( leaf )
523         page_size <<= mgr->leaf_xtra;
524
525   if( pread(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
526         return mgr->err = BTERR_read;
527
528   return 0;
529 }
530
531 //      write page to location in Btree file
532
533 BTERR bt_writepage (BtMgr *mgr, BtPage page, uid page_no, uint leaf)
534 {
535 uint page_size = mgr->page_size;
536
537   if( leaf )
538         page_size <<= mgr->leaf_xtra;
539
540   if( pwrite(mgr->idx, page, page_size, page_no << mgr->page_bits) < page_size )
541         return mgr->err = BTERR_wrt;
542
543   return 0;
544 }
545
546 //      decrement pin count
547
548 void bt_unpinlatch (BtLatchSet *latch)
549 {
550         bt_mutexlock(latch->modify);
551         latch->pin--;
552         bt_releasemutex(latch->modify);
553 }
554
555 //  return the btree cached page address
556
557 BtPage bt_mappage (BtMgr *mgr, BtLatchSet *latch)
558 {
559 uint segment = latch->page_no >> SEG_bits;
560 int flag = PROT_READ | PROT_WRITE;
561 uid mask = (uid)1 << SEG_bits;
562 BtPage page;
563
564   bt_mutexlock (mgr->maps);
565   mask--;
566
567   while( 1 ) {
568         if( segment < mgr->segments ) {
569           page = (BtPage)(mgr->pages[segment] + ((latch->page_no & mask) << mgr->page_bits));
570
571           bt_releasemutex (mgr->maps);
572           return page;
573         }
574
575         if( mgr->segments < mgr->maxseg ) {
576           mgr->pages[mgr->segments] = mmap (0, (uid)mgr->page_size << SEG_bits, flag, MAP_SHARED, mgr->idx, (uid)mgr->segments << mgr->page_bits << SEG_bits);
577           mgr->segments++;
578           continue;
579         }
580
581         mgr->maxseg <<= 1;
582         mgr->pages = realloc (mgr->pages, mgr->maxseg * sizeof(void *));
583   }
584 }
585
586 //  return next available latch entry
587 //        and with latch entry locked
588
589 uint bt_availnext (BtMgr *mgr)
590 {
591 BtLatchSet *latch;
592 uint entry;
593
594   while( 1 ) {
595 #ifdef unix
596         entry = __sync_fetch_and_add (&mgr->pagezero->latchvictim, 1) + 1;
597 #else
598         entry = _InterlockedIncrement (&mgr->pagezero->latchvictim);
599 #endif
600         entry %= mgr->pagezero->latchtotal;
601
602         if( !entry )
603                 continue;
604
605         latch = mgr->latchsets + entry;
606
607         if( !bt_mutextry(latch->modify) )
608                 continue;
609
610         //  return this entry if it is not pinned
611
612         if( !latch->pin )
613                 return entry;
614
615         bt_releasemutex(latch->modify);
616   }
617 }
618
619 //      pin latch in latch pool
620
621 BtLatchSet *bt_pinlatch (BtMgr *mgr, uid page_no)
622 {
623 uint hashidx = page_no % mgr->pagezero->latchhash;
624 uint entry, oldidx;
625 BtLatchSet *latch;
626 BtPage page;
627
628   //  try to find our entry
629
630   bt_mutexlock(mgr->hashtable[hashidx].latch);
631
632   if( entry = mgr->hashtable[hashidx].entry ) do
633   {
634         latch = mgr->latchsets + entry;
635
636         if( page_no == latch->page_no )
637                 break;
638   } while( entry = latch->next );
639
640   //  found our entry: increment pin
641
642   if( entry ) {
643         latch = mgr->latchsets + entry;
644         bt_mutexlock(latch->modify);
645         latch->pin++;
646         bt_releasemutex(latch->modify);
647         bt_releasemutex(mgr->hashtable[hashidx].latch);
648         return latch;
649   }
650
651   //  find and reuse unpinned entry
652
653 trynext:
654   entry = bt_availnext (mgr);
655   latch = mgr->latchsets + entry;
656   oldidx = latch->page_no % mgr->pagezero->latchhash;
657
658   //  skip over this entry if latch not available
659
660   if( latch->page_no )
661    if( oldidx != hashidx )
662     if( !bt_mutextry (mgr->hashtable[oldidx].latch) ) {
663           bt_releasemutex(latch->modify);
664           goto trynext;
665         }
666
667   //  if latch is on a different hash chain
668   //    unlink from the old page_no chain
669
670   if( latch->page_no )
671    if( oldidx != hashidx ) {
672         if( latch->prev )
673           mgr->latchsets[latch->prev].next = latch->next;
674         else
675           mgr->hashtable[oldidx].entry = latch->next;
676
677         if( latch->next )
678           mgr->latchsets[latch->next].prev = latch->prev;
679
680     bt_releasemutex (mgr->hashtable[oldidx].latch);
681    }
682
683   //  link page as head of hash table chain
684   //  if this is a never before used entry,
685   //  or it was previously on a different
686   //  hash table chain. Otherwise, just
687   //  leave it in its current hash table
688   //  chain position.
689
690   if( !latch->page_no || hashidx != oldidx ) {
691         if( latch->next = mgr->hashtable[hashidx].entry )
692           mgr->latchsets[latch->next].prev = entry;
693
694         mgr->hashtable[hashidx].entry = entry;
695     latch->prev = 0;
696   }
697
698   //  fill in latch structure
699
700   latch->page_no = page_no;
701   latch->pin = 1;
702
703   bt_releasemutex (latch->modify);
704   bt_releasemutex (mgr->hashtable[hashidx].latch);
705   return latch;
706 }
707   
708 void bt_mgrclose (BtMgr *mgr)
709 {
710 char *name = mgr->type ? "Main" : "Cache";
711 BtLatchSet *latch;
712 uint num = 0;
713 BtPage page;
714 uint entry;
715
716         //      flush previously written dirty pages
717         //      and write recovery buffer to disk
718
719         fdatasync (mgr->idx);
720
721 #ifdef unix
722         while( mgr->segments )
723                 munmap (mgr->pages[--mgr->segments], (uid)mgr->page_size << SEG_bits);
724 #else
725         while( mgr->segments ) {
726                 FlushViewOfFile(mgr->pages[--mgr->segments], 0);
727                 UnmapViewOfFile(mgr->pages[mgr->Segments]);
728         }
729 #endif
730 #ifdef unix
731         close (mgr->idx);
732         free (mgr);
733 #else
734         FlushFileBuffers(mgr->idx);
735         CloseHandle(mgr->idx);
736         GlobalFree (mgr);
737 #endif
738 }
739
740 //      close and release memory
741
742 void bt_close (BtDb *bt)
743 {
744         free (bt);
745 }
746
747 void bt_initpage (BtMgr *mgr, BtPage page, uid leaf_page_no, uint lvl)
748 {
749 BtSlot *node = slotptr(page, 1);
750 unsigned char value[BtId];
751 uid page_no;
752 BtKey* key;
753 BtVal *val;
754
755         page_no = lvl ? ROOT_page : leaf_page_no;
756         node->off = mgr->page_size;
757
758         if( !lvl )
759                 node->off <<= mgr->leaf_xtra;
760
761         node->off -= 3 + (lvl ? BtId + sizeof(BtVal): sizeof(BtVal));
762         node->type = Librarian;
763         node++->dead = 1;
764
765         node->off = node[-1].off;
766         key = keyptr(page, 2);
767         key = keyptr(page, 1);
768         key->len = 2;           // create stopper key
769         key->key[0] = 0xff;
770         key->key[1] = 0xff;
771
772         bt_putid(value, leaf_page_no);
773         val = valptr(page, 1);
774         val->len = lvl ? BtId : 0;
775         memcpy (val->value, value, val->len);
776
777         page->min = node->off;
778         page->lvl = lvl;
779         page->cnt = 2;
780         page->act = 1;
781
782         if( bt_writepage (mgr, page, page_no, !lvl) ) {
783                 fprintf (stderr, "Unable to create btree page %d\n", page_no);
784                 exit(0);
785         }
786 }
787
788 //  open/create new btree buffer manager
789
790 //      call with file_name, BT_openmode, bits in page size (e.g. 16),
791 //              extra bits for leaves (e.g. 4) size of latch pool (e.g. 500)
792
793 BtMgr *bt_mgr (char *name, uint pagebits, uint leafxtra, uint nodemax)
794 {
795 uint lvl, attr, last, slot, idx, blk;
796 int flag, initit = 0;
797 BtPageZero *pagezero;
798 BtLatchSet *latch;
799 uid leaf_page;
800 off64_t size;
801 BtPage page;
802 uint amt[1];
803 BtMgr* mgr;
804
805         // determine sanity of page size and buffer pool
806
807         if( leafxtra | pagebits )
808           if( leafxtra + pagebits > BT_maxbits )
809                 fprintf (stderr, "pagebits + leafxtra > maxbits\n"), exit(1);
810
811         if( pagebits )
812           if( pagebits < BT_minbits )
813                 fprintf (stderr, "pagebits < minbits\n"), exit(1);
814
815 #ifdef unix
816         mgr = calloc (1, sizeof(BtMgr));
817
818         mgr->idx = open ((char*)name, O_RDWR | O_CREAT, 0666);
819
820         if( mgr->idx == -1 ) {
821                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
822                 return free(mgr), NULL;
823         }
824 #else
825         mgr = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, sizeof(BtMgr));
826         attr = FILE_ATTRIBUTE_NORMAL;
827         mgr->idx = CreateFile(name, GENERIC_READ| GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, attr, NULL);
828
829         if( mgr->idx == INVALID_HANDLE_VALUE ) {
830                 fprintf (stderr, "Unable to create/open btree file %s\n", name);
831                 return GlobalFree(mgr), NULL;
832         }
833 #endif
834
835 #ifdef unix
836         pagezero = valloc (BT_maxpage);
837         page = (BtPage)pagezero;
838         *amt = 0;
839
840         // read minimum page size to get root info
841         //      to support raw disk partition files
842         //      check if page_bits == 0 on the disk.
843
844         if( size = lseek (mgr->idx, 0L, 2) )
845                 if( pread(mgr->idx, pagezero, BT_minpage, 0) == BT_minpage )
846                         if( pagezero->page_bits ) {
847                                 pagebits = pagezero->page_bits;
848                                 leafxtra = pagezero->leaf_xtra;
849                         } else
850                                 initit = 1;
851                 else
852                         return free(mgr), free(pagezero), NULL;
853         else
854                 initit = 1;
855 #else
856         pagezero = VirtualAlloc(NULL, BT_maxpage, MEM_COMMIT, PAGE_READWRITE);
857         size = GetFileSize(mgr->idx, amt);
858
859         if( size || *amt ) {
860                 if( !ReadFile(mgr->idx, (char *)pagezero, BT_minpage, amt, NULL) )
861                         return bt_mgrclose (mgr), NULL;
862                 pagebits = pagezero->page_bits;
863                 leafxtra = pagezero->leaf_xtra;
864         } else
865                 initit = 1;
866 #endif
867
868         mgr->page_size = 1 << pagebits;
869         mgr->page_bits = pagebits;
870         mgr->leaf_xtra = leafxtra;
871
872         if( !initit )
873                 goto mgrlatch;
874
875         //  calculate number of latch table & hash entries
876
877         memset (pagezero, 0, 1 << pagebits);
878         pagezero->nlatchpage = nodemax/16 * sizeof(BtHashEntry);
879
880         pagezero->nlatchpage += sizeof(BtLatchSet) * nodemax + mgr->page_size - 1;
881         pagezero->nlatchpage >>= mgr->page_bits;
882         pagezero->latchtotal = nodemax;
883
884         pagezero->latchhash = (((uid)pagezero->nlatchpage<< mgr->page_bits) - nodemax * sizeof(BtLatchSet)) / sizeof(BtHashEntry);
885
886         // initialize an empty b-tree with alloc page, root page, leaf page
887         // and page(s) of latches and page pool cache
888
889         pagezero->page_bits = mgr->page_bits;
890         pagezero->leaf_xtra = leafxtra;
891         pagezero->upperpages = 1;
892         pagezero->leafpages = 1;
893
894         leaf_page = pagezero->leaf_page = pagezero->nlatchpage + LATCH_page;
895
896         //      round first leafpage up to leafxtra boundary
897
898         if( pagezero->leaf_page & ((1 << leafxtra) - 1)) {
899                 blk = pagezero->leaf_page;
900                 pagezero->leaf_page |= (1 << leafxtra) - 1;
901                 pagezero->freechain = pagezero->leaf_page++;
902                 leaf_page = pagezero->leaf_page;
903         } else
904                 blk = 0;
905
906         pagezero->rightleaf = pagezero->leaf_page;
907         pagezero->allocpage = pagezero->leaf_page + (1 << leafxtra);
908
909         if( pwrite (mgr->idx, pagezero, 1 << pagebits, 0) < 1 << pagebits) {
910                 fprintf (stderr, "Unable to create btree page zero\n");
911                 return bt_mgrclose (mgr), NULL;
912         }
913
914         //      initialize root level 1 page
915
916         memset (page, 0, 1 << pagebits);
917         bt_initpage (mgr, page, leaf_page, 1);
918
919         //  chain unused pages as first freelist
920
921         memset (page, 0, 1 << pagebits);
922
923         while( blk & ((1 << leafxtra) - 1) ) {
924           if( bt_writepage (mgr, page, blk, 0) ) {
925                 fprintf(stderr, "unable to write initial free blk %d\r\n", blk);
926                 exit(1);
927           }
928           page->right = blk++;
929         }
930
931         // initialize first page of leaves
932
933         memset (page, 0, 1 << pagebits);
934         bt_initpage (mgr, page, leaf_page, 0);
935
936 mgrlatch:
937 #ifdef unix
938         free (pagezero);
939 #else
940         VirtualFree (pagezero, 0, MEM_RELEASE);
941 #endif
942
943         //      map first segment
944
945         mgr->segments = 1;
946         mgr->maxseg = MIN_seg;
947         mgr->pages = calloc (MIN_seg, sizeof(unsigned char *));
948
949         flag = PROT_READ | PROT_WRITE;
950         mgr->pages[0] = mmap (0, (uid)mgr->page_size << SEG_bits, flag, MAP_SHARED, mgr->idx, 0);
951
952         if( mgr->pages[0] == MAP_FAILED ) {
953                 fprintf (stderr, "Unable to mmap pagezero btree segment, error = %d\n", errno);
954                 return bt_mgrclose (mgr), NULL;
955         }
956
957         mgr->pagezero = (BtPageZero *)mgr->pages[0];
958         mlock (mgr->pagezero, mgr->page_size);
959
960         //      allocate latch pool
961
962         mgr->latchsets = (BtLatchSet *)(mgr->pages[0] + ((uid)LATCH_page << mgr->page_bits));
963         mgr->hashtable = (BtHashEntry *)(mgr->latchsets + mgr->pagezero->latchtotal);
964
965         return mgr;
966 }
967
968 //      open BTree access method
969 //      based on buffer manager
970
971 BtDb *bt_open (BtMgr *mgr, BtMgr *main)
972 {
973 BtDb *bt = malloc (sizeof(*bt));
974
975         memset (bt, 0, sizeof(*bt));
976         bt->tid = sys_gettid();
977         bt->main = main;
978         bt->mgr = mgr;
979         return bt;
980 }
981
982 //  compare two keys, return > 0, = 0, or < 0
983 //  =0: keys are same
984 //  -1: key2 > key1
985 //  +1: key2 < key1
986 //  as the comparison value
987
988 int keycmp (BtKey* key1, unsigned char *key2, uint len2)
989 {
990 uint len1 = key1->len;
991 int ans;
992
993         if( ans = memcmp (key1->key, key2, len1 > len2 ? len2 : len1) )
994                 return ans;
995
996         if( len1 > len2 )
997                 return 1;
998         if( len1 < len2 )
999                 return -1;
1000
1001         return 0;
1002 }
1003
1004 // place write, read, or parent lock on requested page_no.
1005
1006 void bt_lockpage(BtLock mode, BtLatchSet *latch, pid_t tid, uint line)
1007 {
1008         switch( mode ) {
1009         case BtLockRead:
1010                 ReadLock (latch->readwr);
1011                 break;
1012         case BtLockWrite:
1013                 WriteLock (latch->readwr, tid, line);
1014                 break;
1015         case BtLockAccess:
1016                 ReadLock (latch->access);
1017                 break;
1018         case BtLockDelete:
1019                 WriteLock (latch->access, tid, line);
1020                 break;
1021         case BtLockParent:
1022                 WriteLock (latch->parent, tid, line);
1023                 break;
1024         case BtLockLink:
1025                 WriteLock (latch->link, tid, line);
1026                 break;
1027         }
1028 }
1029
1030 // remove write, read, or parent lock on requested page
1031
1032 void bt_unlockpage(BtLock mode, BtLatchSet *latch, uint line)
1033 {
1034         switch( mode ) {
1035         case BtLockRead:
1036                 ReadRelease (latch->readwr);
1037                 break;
1038         case BtLockWrite:
1039                 WriteRelease (latch->readwr);
1040                 break;
1041         case BtLockAccess:
1042                 ReadRelease (latch->access);
1043                 break;
1044         case BtLockDelete:
1045                 WriteRelease (latch->access);
1046                 break;
1047         case BtLockParent:
1048                 WriteRelease (latch->parent);
1049                 break;
1050         case BtLockLink:
1051                 WriteRelease (latch->link);
1052                 break;
1053         }
1054 }
1055
1056 //      allocate a new page
1057 //      return with page latched, but unlocked.
1058 //      contents is cleared for lvl > 0
1059
1060 int bt_newpage(BtMgr *mgr, BtPageSet *set, BtPage contents)
1061 {
1062 uint page_size = mgr->page_size, blk;
1063 uid *freechain;
1064 uid page_no;
1065
1066         //      lock allocation page
1067
1068         bt_mutexlock(mgr->pagezero->lock);
1069
1070         if( contents->lvl ) {
1071                 freechain = &mgr->pagezero->freechain;
1072                 mgr->pagezero->upperpages++;
1073         } else {
1074                 freechain = &mgr->pagezero->leafchain;
1075                 mgr->pagezero->leafpages++;
1076                 page_size <<= mgr->leaf_xtra;
1077         }
1078
1079         // use empty chain first
1080         // else allocate new page
1081
1082         if( page_no = *freechain ) {
1083                 if( set->latch = bt_pinlatch (mgr, page_no) )
1084                         set->page = bt_mappage (mgr, set->latch);
1085                 else
1086                         return mgr->line = __LINE__, mgr->err = BTERR_struct;
1087
1088                 *freechain = set->page->right;
1089
1090                 //  the page is currently nopromote and this
1091                 //  will keep bt_promote out.
1092
1093                 //      contents will replace this bit
1094                 //  and pin will keep bt_promote out
1095
1096                 contents->nopromote = 0;
1097
1098                 memcpy (set->page, contents, page_size);
1099
1100 //              if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1101 //                fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1102
1103                 bt_releasemutex(mgr->pagezero->lock);
1104                 return 0;
1105         }
1106
1107         //  obtain next available page number
1108         //      suitable for leaf or higher level
1109
1110         page_no = mgr->pagezero->allocpage;
1111         mgr->pagezero->allocpage += 1 << mgr->leaf_xtra;
1112
1113         //      keep bt_promote out of this page
1114
1115         contents->nopromote = 1;
1116
1117         // unlock allocation latch and
1118         //      extend file into new page.
1119
1120 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1121 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1122
1123         if( bt_writepage (mgr, contents, page_no, !contents->lvl) )
1124                 fprintf(stderr, "Write %lld error %d\n", page_no + blk, errno);
1125
1126         //      chain together unused non-leaf allocation
1127
1128         if( contents->lvl ) {
1129           memset (contents, 0, mgr->page_size);
1130
1131           for( blk = 1; blk < 1 << mgr->leaf_xtra; blk++ ) {
1132                 if( bt_writepage (mgr, contents, page_no + blk, 0) )
1133                   fprintf(stderr, "Write %lld error %d\n", page_no + blk, errno);
1134                 contents->right = page_no + blk;
1135                 *freechain = page_no + blk;
1136           }
1137         }
1138
1139         bt_releasemutex(mgr->pagezero->lock);
1140
1141         if( set->latch = bt_pinlatch (mgr, page_no) )
1142                 set->page = bt_mappage (mgr, set->latch);
1143         else
1144                 return mgr->err;
1145
1146         // now pin will keep bt_promote out
1147
1148         set->page->nopromote = 0;
1149         return 0;
1150 }
1151
1152 //  find slot in page for given key at a given level
1153
1154 int bt_findslot (BtPage page, unsigned char *key, uint len)
1155 {
1156 uint diff, higher = page->cnt, low = 1, slot;
1157 uint good = 0;
1158
1159         //        make stopper key an infinite fence value
1160
1161         if( page->right )
1162                 higher++;
1163         else
1164                 good++;
1165
1166         //      low is the lowest candidate.
1167         //  loop ends when they meet
1168
1169         //  higher is already
1170         //      tested as .ge. the passed key.
1171
1172         while( diff = higher - low ) {
1173                 slot = low + ( diff >> 1 );
1174                 if( keycmp (keyptr(page, slot), key, len) < 0 )
1175                         low = slot + 1;
1176                 else
1177                         higher = slot, good++;
1178         }
1179
1180         //      return zero if key is on right link page
1181
1182         return good ? higher : 0;
1183 }
1184
1185 //  find and load page at given level for given key
1186 //      leave page rd or wr locked as requested
1187
1188 int bt_loadpage (BtMgr *mgr, BtPageSet *set, unsigned char *key, uint len, uint lvl, BtLock lock, pid_t tid)
1189 {
1190 uid page_no = ROOT_page, prevpage_no = 0;
1191 uint drill = 0xff, slot;
1192 uint mode, prevmode;
1193 BtPageSet prev[1];
1194 BtVal *val;
1195 BtKey *ptr;
1196
1197   //  start at root of btree and drill down
1198
1199   do {
1200         if( set->latch = bt_pinlatch (mgr, page_no) )
1201           set->page = bt_mappage (mgr, set->latch);
1202         else
1203           return 0;
1204
1205         if( page_no > ROOT_page )
1206           bt_lockpage(BtLockAccess, set->latch, tid, __LINE__);
1207
1208         //      release & unpin parent or left sibling page
1209
1210         if( prevpage_no ) {
1211           bt_unlockpage(prevmode, prev->latch, __LINE__);
1212           bt_unpinlatch (prev->latch);
1213           prevpage_no = 0;
1214         }
1215
1216         // obtain mode lock using lock coupling through AccessLock
1217         // determine lock mode of drill level
1218
1219         mode = (drill == lvl) ? lock : BtLockRead; 
1220         bt_lockpage(mode, set->latch, tid, __LINE__);
1221
1222         // grab our fence key
1223
1224         ptr=fenceptr(set->page);
1225
1226         if( set->page->free )
1227                 return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1228
1229         if( page_no > ROOT_page )
1230           bt_unlockpage(BtLockAccess, set->latch, __LINE__);
1231
1232         // re-read and re-lock root after determining actual level of root
1233
1234         if( set->page->lvl != drill) {
1235                 if( set->latch->page_no != ROOT_page )
1236                         return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1237                         
1238                 drill = set->page->lvl;
1239
1240                 if( lock != BtLockRead && drill == lvl ) {
1241                   bt_unlockpage(mode, set->latch, __LINE__);
1242                   bt_unpinlatch (set->latch);
1243                   continue;
1244                 }
1245         }
1246
1247         prevpage_no = set->latch->page_no;
1248         prevmode = mode;
1249         *prev = *set;
1250
1251         //  if requested key is beyond our fence,
1252         //      slide to the right
1253
1254         if( keycmp (ptr, key, len) < 0 )
1255           if( page_no = set->page->right )
1256                 continue;
1257
1258         //  if page is part of a delete operation,
1259         //      slide to the left;
1260
1261         if( set->page->kill ) {
1262           bt_lockpage(BtLockLink, set->latch, tid, __LINE__);
1263           page_no = set->page->left;
1264           bt_unlockpage(BtLockLink, set->latch, __LINE__);
1265           continue;
1266         }
1267
1268         //  find key on page at this level
1269         //  and descend to requested level
1270
1271         if( slot = bt_findslot (set->page, key, len) ) {
1272           if( drill == lvl )
1273                 return slot;
1274
1275           // find next non-dead slot -- the fence key if nothing else
1276
1277           while( slotptr(set->page, slot)->dead )
1278                 if( slot++ < set->page->cnt )
1279                   continue;
1280                 else
1281                   return mgr->err = BTERR_struct, mgr->line = __LINE__, 0;
1282
1283           val = valptr(set->page, slot);
1284
1285           if( val->len == BtId )
1286                 page_no = bt_getid(val->value);
1287           else
1288                 return mgr->line = __LINE__, mgr->err = BTERR_struct, 0;
1289
1290           drill--;
1291           continue;
1292          }
1293
1294         //  slide right into next page
1295
1296         page_no = set->page->right;
1297
1298   } while( page_no );
1299
1300   // return error on end of right chain
1301
1302   mgr->line = __LINE__, mgr->err = BTERR_struct;
1303   return 0;     // return error
1304 }
1305
1306 //      return page to free list
1307 //      page must be delete, link & write locked
1308 //      and have no keys pointing to it.
1309
1310 void bt_freepage (BtMgr *mgr, BtPageSet *set)
1311 {
1312 uid *freechain;
1313
1314         //      lock allocation page
1315
1316         bt_mutexlock (mgr->pagezero->lock);
1317
1318         if( set->page->lvl ) {
1319                 freechain = &mgr->pagezero->freechain;
1320                 mgr->pagezero->upperpages--;
1321         } else {
1322                 freechain = &mgr->pagezero->leafchain;
1323                 mgr->pagezero->leafpages--;
1324         }
1325
1326         //      store chain link
1327
1328         set->page->right = *freechain;
1329         *freechain = set->latch->page_no;
1330         set->page->free = 1;
1331
1332 //      if( msync (mgr->pagezero, mgr->page_size, MS_SYNC) < 0 )
1333 //        fprintf(stderr, "msync error %d line %d\n", errno, __LINE__);
1334
1335         // unlock released page
1336         // and unlock allocation page
1337
1338         bt_unlockpage (BtLockDelete, set->latch, __LINE__);
1339         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
1340         bt_unlockpage (BtLockLink, set->latch, __LINE__);
1341         bt_unpinlatch (set->latch);
1342         bt_releasemutex (mgr->pagezero->lock);
1343 }
1344
1345 //      a fence key was deleted from an interiour level page
1346 //      push new fence value upwards
1347
1348 BTERR bt_fixfence (BtMgr *mgr, BtPageSet *set, uint lvl)
1349 {
1350 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1351 unsigned char value[BtId];
1352 BtKey* ptr;
1353 uint idx;
1354
1355         //      remove the old fence value
1356
1357         ptr = fenceptr(set->page);
1358         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1359         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1360         set->page->fence = slotptr(set->page, set->page->cnt)->off;
1361
1362         //  cache new fence value
1363
1364         ptr = fenceptr(set->page);
1365         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1366
1367         bt_lockpage (BtLockParent, set->latch, 0, __LINE__);
1368         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
1369
1370         //      insert new (now smaller) fence key
1371
1372         bt_putid (value, set->latch->page_no);
1373         ptr = (BtKey*)leftkey;
1374
1375         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique) )
1376           return mgr->err;
1377
1378         //      now delete old fence key
1379
1380         ptr = (BtKey*)rightkey;
1381
1382         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1) )
1383                 return mgr->err;
1384
1385         bt_unlockpage (BtLockParent, set->latch, __LINE__);
1386         bt_unpinlatch(set->latch);
1387         return 0;
1388 }
1389
1390 //      root has a single child
1391 //      collapse a level from the tree
1392
1393 BTERR bt_collapseroot (BtMgr *mgr, BtPageSet *root)
1394 {
1395 BtPageSet child[1];
1396 uid page_no;
1397 BtVal *val;
1398 uint idx;
1399
1400   // find the child entry and promote as new root contents
1401
1402   do {
1403         for( idx = 0; idx++ < root->page->cnt; )
1404           if( !slotptr(root->page, idx)->dead )
1405                 break;
1406
1407         val = valptr(root->page, idx);
1408
1409         if( val->len == BtId )
1410                 page_no = bt_getid (valptr(root->page, idx)->value);
1411         else
1412                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1413
1414         if( child->latch = bt_pinlatch (mgr, page_no) )
1415                 child->page = bt_mappage (mgr, child->latch);
1416         else
1417                 return mgr->err;
1418
1419         bt_lockpage (BtLockDelete, child->latch, 0, __LINE__);
1420         bt_lockpage (BtLockWrite, child->latch, 0, __LINE__);
1421
1422         memcpy (root->page, child->page, mgr->page_size);
1423         bt_freepage (mgr, child);
1424
1425   } while( root->page->lvl > 1 && root->page->act == 1 );
1426
1427   bt_unlockpage (BtLockWrite, root->latch, __LINE__);
1428   bt_unpinlatch (root->latch);
1429   return 0;
1430 }
1431
1432 //  delete a page and manage key
1433 //  call with page writelocked
1434
1435 //      returns with page unpinned
1436 //      from the page pool.
1437
1438 BTERR bt_deletepage (BtMgr *mgr, BtPageSet *set, uint lvl)
1439 {
1440 unsigned char higherfence[BT_keyarray], lowerfence[BT_keyarray];
1441 uint page_size = mgr->page_size, kill;
1442 BtPageSet right[1], temp[1];
1443 unsigned char value[BtId];
1444 uid page_no, right2;
1445 BtKey *ptr;
1446
1447         if( !lvl )
1448                 page_size <<= mgr->leaf_xtra;
1449
1450         //  cache original copy of original fence key
1451         //      that is going to be deleted.
1452
1453         ptr = fenceptr(set->page);
1454         memcpy (lowerfence, ptr, ptr->len + sizeof(BtKey));
1455
1456         //      pin and lock our right page
1457
1458         page_no = set->page->right;
1459
1460         if( right->latch = bt_pinlatch (mgr, page_no) )
1461                 right->page = bt_mappage (mgr, right->latch);
1462         else
1463                 return 0;
1464
1465         bt_lockpage (BtLockWrite, right->latch, 0, __LINE__);
1466
1467         if( right->page->kill || set->page->kill )
1468                 return mgr->line = __LINE__, mgr->err = BTERR_struct;
1469
1470         // pull contents of right sibling over our empty page
1471         //      preserving our left page number, and its right page number.
1472
1473         bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
1474         page_no = set->page->left;
1475         memcpy (set->page, right->page, page_size);
1476         set->page->left = page_no;
1477         bt_unlockpage (BtLockLink, set->latch, __LINE__);
1478
1479         //  fix left link from far right page
1480
1481         if( right2 = set->page->right ) {
1482           if( temp->latch = bt_pinlatch (mgr, right2) )
1483                 temp->page = bt_mappage (mgr, temp->latch);
1484           else
1485                 return 0;
1486
1487           bt_lockpage (BtLockAccess, temp->latch, 0, __LINE__);
1488       bt_lockpage(BtLockLink, temp->latch, 0, __LINE__);
1489           temp->page->left = set->latch->page_no;
1490           bt_unlockpage(BtLockLink, temp->latch, __LINE__);
1491           bt_unlockpage(BtLockAccess, temp->latch, __LINE__);
1492           bt_unpinlatch (temp->latch);
1493         } else if( !lvl ) {     // our page is now rightmost leaf
1494           bt_mutexlock (mgr->pagezero->lock);
1495           mgr->pagezero->rightleaf = set->latch->page_no;
1496           bt_releasemutex(mgr->pagezero->lock);
1497         }
1498
1499         ptr = fenceptr(set->page);
1500         memcpy (higherfence, ptr, ptr->len + sizeof(BtKey));
1501
1502         // mark right page as being deleted and release lock
1503         //      keep lock on parent modification.
1504
1505         right->page->kill = 1;
1506         bt_lockpage (BtLockParent, right->latch, 0, __LINE__);
1507         bt_unlockpage (BtLockWrite, right->latch, __LINE__);
1508
1509         bt_lockpage (BtLockParent, set->latch, 0, __LINE__);
1510         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
1511
1512         // redirect the new higher key directly to our new node
1513
1514         ptr = (BtKey *)higherfence;
1515         bt_putid (value, set->latch->page_no);
1516
1517         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Update) )
1518           return mgr->err;
1519
1520         //      delete our original fence key in parent
1521
1522         ptr = (BtKey *)lowerfence;
1523
1524         if( bt_deletekey (mgr, ptr->key, ptr->len, lvl+1) )
1525           return mgr->err;
1526
1527         //  wait for all access to drain away with delete lock,
1528         //      then obtain write lock to right node and free it.
1529
1530         bt_lockpage (BtLockDelete, right->latch, 0, __LINE__);
1531         bt_lockpage (BtLockWrite, right->latch, 0, __LINE__);
1532         bt_lockpage (BtLockLink, right->latch, 0, __LINE__);
1533         bt_unlockpage (BtLockParent, right->latch, __LINE__);
1534         bt_freepage (mgr, right);
1535
1536         //      release parent lock to our node
1537
1538         bt_unlockpage (BtLockParent, set->latch, __LINE__);
1539         bt_unpinlatch (set->latch);
1540         return 0;
1541 }
1542
1543 //  find and delete key on page by marking delete flag bit
1544 //  if page becomes empty, delete it from the btree
1545
1546 BTERR bt_deletekey (BtMgr *mgr, unsigned char *key, uint len, uint lvl)
1547 {
1548 uint slot, idx, found, fence;
1549 BtPageSet set[1];
1550 BtSlot *node;
1551 BtKey *ptr;
1552 BtVal *val;
1553
1554         if( slot = bt_loadpage (mgr, set, key, len, lvl, BtLockWrite, 0) ) {
1555                 node = slotptr(set->page, slot);
1556                 ptr = keyptr(set->page, slot);
1557         } else
1558                 return mgr->err;
1559
1560         // if librarian slot, advance to real slot
1561
1562         if( node->type == Librarian ) {
1563                 ptr = keyptr(set->page, ++slot);
1564                 node = slotptr(set->page, slot);
1565         }
1566
1567         fence = slot == set->page->cnt;
1568
1569         // delete the key, ignore request if already dead
1570
1571         if( found = !keycmp (ptr, key, len) )
1572           if( found = node->dead == 0 ) {
1573                 val = valptr(set->page,slot);
1574                 set->page->garbage += ptr->len + val->len + sizeof(BtKey) + sizeof(BtVal);
1575                 set->page->act--;
1576                 node->dead = 1;
1577
1578                 // collapse empty slots beneath the fence
1579                 // on interiour nodes
1580
1581                 if( lvl )
1582                  while( idx = set->page->cnt - 1 )
1583                   if( slotptr(set->page, idx)->dead ) {
1584                         *slotptr(set->page, idx) = *slotptr(set->page, idx + 1);
1585                         memset (slotptr(set->page, set->page->cnt--), 0, sizeof(BtSlot));
1586                   } else
1587                         break;
1588           }
1589
1590         if( !found )
1591                 return 0;
1592
1593         //      did we delete a fence key in an upper level?
1594
1595         if( lvl && set->page->act && fence )
1596           return bt_fixfence (mgr, set, lvl);
1597
1598         //      do we need to collapse root?
1599
1600         if( lvl > 1 && set->latch->page_no == ROOT_page && set->page->act == 1 )
1601           return bt_collapseroot (mgr, set);
1602
1603         //      delete empty page
1604
1605         if( !set->page->act )
1606           return bt_deletepage (mgr, set, set->page->lvl);
1607
1608         bt_unlockpage(BtLockWrite, set->latch, __LINE__);
1609         bt_unpinlatch (set->latch);
1610         return 0;
1611 }
1612
1613 //      check page for space available,
1614 //      clean if necessary and return
1615 //      0 - page needs splitting
1616 //      >0  new slot value
1617
1618 uint bt_cleanpage(BtMgr *mgr, BtPageSet *set, uint keylen, uint slot, uint vallen)
1619 {
1620 uint page_size = mgr->page_size;
1621 BtPage page = set->page, frame;
1622 uint cnt = 0, idx = 0;
1623 uint max = page->cnt;
1624 uint newslot = max;
1625 BtKey *key;
1626 BtVal *val;
1627
1628         if( !set->page->lvl )
1629                 page_size <<= mgr->leaf_xtra;
1630
1631         if( page->min >= (max+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal))
1632                 return slot;
1633
1634         //      skip cleanup and proceed to split
1635         //      if there's not enough garbage
1636         //      to bother with.
1637
1638         if( page->garbage < page_size / 5 )
1639                 return 0;
1640
1641         frame = malloc (page_size);
1642         memcpy (frame, page, page_size);
1643
1644         // skip page info and set rest of page to zero
1645
1646         memset (page+1, 0, page_size - sizeof(*page));
1647
1648         page->min = page_size;
1649         page->garbage = 0;
1650         page->act = 0;
1651
1652         // clean up page first by
1653         // removing dead keys
1654
1655         while( cnt++ < max ) {
1656                 if( cnt == slot )
1657                         newslot = idx + 2;
1658
1659                 if( cnt < max || frame->lvl )
1660                   if( slotptr(frame,cnt)->dead )
1661                         continue;
1662
1663                 // copy the value across
1664
1665                 val = valptr(frame, cnt);
1666                 page->min -= val->len + sizeof(BtVal);
1667                 memcpy ((unsigned char *)page + page->min, val, val->len + sizeof(BtVal));
1668
1669                 // copy the key across
1670
1671                 key = keyptr(frame, cnt);
1672                 page->min -= key->len + sizeof(BtKey);
1673                 memcpy ((unsigned char *)page + page->min, key, key->len + sizeof(BtKey));
1674
1675                 // make a librarian slot
1676
1677                 slotptr(page, ++idx)->off = page->min;
1678                 slotptr(page, idx)->type = Librarian;
1679                 slotptr(page, idx)->dead = 1;
1680
1681                 // set up the slot
1682
1683                 slotptr(page, ++idx)->off = page->min;
1684                 slotptr(page, idx)->type = slotptr(frame, cnt)->type;
1685
1686                 if( !(slotptr(page, idx)->dead = slotptr(frame, cnt)->dead) )
1687                   page->act++;
1688         }
1689
1690         page->fence = page->min;
1691         page->cnt = idx;
1692         free (frame);
1693
1694         //      see if page has enough space now, or does it need splitting?
1695
1696         if( page->min >= (idx+2) * sizeof(BtSlot) + sizeof(*page) + keylen + sizeof(BtKey) + vallen + sizeof(BtVal) )
1697                 return newslot;
1698
1699         return 0;
1700 }
1701
1702 // split the root and raise the height of the btree
1703
1704 BTERR bt_splitroot(BtMgr *mgr, BtPageSet *root, BtLatchSet *right)
1705 {  
1706 unsigned char leftkey[BT_keyarray];
1707 uint nxt = mgr->page_size;
1708 unsigned char value[BtId];
1709 BtPage frame, page;
1710 BtPageSet left[1];
1711 uid left_page_no;
1712 BtKey *ptr;
1713 BtVal *val;
1714
1715         frame = malloc (mgr->page_size);
1716         memcpy (frame, root->page, mgr->page_size);
1717
1718         //      save left page fence key for new root
1719
1720         ptr = fenceptr(root->page);
1721         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1722
1723         //  Obtain an empty page to use, and copy the current
1724         //  root contents into it, e.g. lower keys
1725
1726         if( bt_newpage(mgr, left, frame) )
1727                 return mgr->err;
1728
1729         left_page_no = left->latch->page_no;
1730         bt_unpinlatch (left->latch);
1731         free (frame);
1732
1733         //      left link the pages together
1734
1735         page = bt_mappage (mgr, right);
1736         page->left = left_page_no;
1737
1738         // preserve the page info at the bottom
1739         // of higher keys and set rest to zero
1740
1741         memset(root->page+1, 0, mgr->page_size - sizeof(*root->page));
1742
1743         // insert stopper key at top of newroot page
1744         // and increase the root height
1745
1746         nxt -= BtId + sizeof(BtVal);
1747         bt_putid (value, right->page_no);
1748         val = (BtVal *)((unsigned char *)root->page + nxt);
1749         memcpy (val->value, value, BtId);
1750         val->len = BtId;
1751
1752         nxt -= 2 + sizeof(BtKey);
1753         page->fence = nxt;
1754
1755         slotptr(root->page, 2)->off = nxt;
1756         ptr = (BtKey *)((unsigned char *)root->page + nxt);
1757         ptr->len = 2;
1758         ptr->key[0] = 0xff;
1759         ptr->key[1] = 0xff;
1760
1761         // insert lower keys page fence key on newroot page as first key
1762
1763         nxt -= BtId + sizeof(BtVal);
1764         bt_putid (value, left_page_no);
1765         val = (BtVal *)((unsigned char *)root->page + nxt);
1766         memcpy (val->value, value, BtId);
1767         val->len = BtId;
1768
1769         ptr = (BtKey *)leftkey;
1770         nxt -= ptr->len + sizeof(BtKey);
1771         slotptr(root->page, 1)->off = nxt;
1772         memcpy ((unsigned char *)root->page + nxt, leftkey, ptr->len + sizeof(BtKey));
1773         
1774         root->page->right = 0;
1775         root->page->min = nxt;          // reset lowest used offset and key count
1776         root->page->cnt = 2;
1777         root->page->act = 2;
1778         root->page->lvl++;
1779
1780         // release and unpin root pages
1781
1782         bt_unlockpage(BtLockWrite, root->latch, __LINE__);
1783         bt_unpinlatch (root->latch);
1784
1785         bt_unpinlatch (right);
1786         return 0;
1787 }
1788
1789 //  split already locked full node
1790 //      leave it locked.
1791 //      return pool entry for new right
1792 //      page, pinned & unlocked
1793
1794 uint bt_splitpage (BtMgr *mgr, BtPageSet *set, uint linkleft)
1795 {
1796 uint page_size = mgr->page_size;
1797 BtPageSet right[1], temp[1];
1798 uint cnt = 0, idx = 0, max;
1799 uint lvl = set->page->lvl;
1800 BtPage frame;
1801 BtKey *key;
1802 BtVal *val;
1803 uid right2;
1804 uint entry;
1805 uint prev;
1806
1807         if( !set->page->lvl )
1808                 page_size <<= mgr->leaf_xtra;
1809
1810         //  split higher half of keys to frame
1811
1812         frame = malloc (page_size);
1813         memset (frame, 0, page_size);
1814         frame->min = page_size;
1815         max = set->page->cnt;
1816         cnt = max / 2;
1817         idx = 0;
1818
1819         while( cnt++ < max ) {
1820                 if( cnt < max || set->page->lvl )
1821                   if( slotptr(set->page, cnt)->dead )
1822                         continue;
1823
1824                 val = valptr(set->page, cnt);
1825                 frame->min -= val->len + sizeof(BtVal);
1826                 memcpy ((unsigned char *)frame + frame->min, val, val->len + sizeof(BtVal));
1827
1828                 key = keyptr(set->page, cnt);
1829                 frame->min -= key->len + sizeof(BtKey);
1830                 memcpy ((unsigned char *)frame + frame->min, key, key->len + sizeof(BtKey));
1831
1832                 //      add librarian slot
1833
1834                 slotptr(frame, ++idx)->off = frame->min;
1835                 slotptr(frame, idx)->type = Librarian;
1836                 slotptr(frame, idx)->dead = 1;
1837
1838                 //  add actual slot
1839
1840                 slotptr(frame, ++idx)->off = frame->min;
1841                 slotptr(frame, idx)->type = slotptr(set->page, cnt)->type;
1842
1843                 if( !(slotptr(frame, idx)->dead = slotptr(set->page, cnt)->dead) )
1844                   frame->act++;
1845         }
1846
1847         frame->fence = frame->min;
1848         frame->cnt = idx;
1849         frame->lvl = lvl;
1850
1851         // link right node
1852
1853         if( set->latch->page_no > ROOT_page ) {
1854                 right2 = set->page->right;
1855                 frame->right = right2;
1856
1857                 if( linkleft )
1858                         frame->left = set->latch->page_no;
1859         }
1860
1861         //      get new free page and write higher keys to it.
1862
1863         if( bt_newpage(mgr, right, frame) )
1864                 return 0;
1865
1866         //      link far right's left pointer to new page
1867
1868         if( linkleft && set->latch->page_no > ROOT_page )
1869          if( right2 ) {
1870           if( temp->latch = bt_pinlatch (mgr, right2) )
1871                 temp->page = bt_mappage (mgr, temp->latch);
1872           else
1873                 return 0;
1874
1875       bt_lockpage(BtLockLink, temp->latch, 0, __LINE__);
1876           temp->page->left = right->latch->page_no;
1877           bt_unlockpage(BtLockLink, temp->latch, __LINE__);
1878           bt_unpinlatch (temp->latch);
1879          } else if( !lvl ) {    // page is rightmost leaf
1880           bt_mutexlock (mgr->pagezero->lock);
1881           mgr->pagezero->rightleaf = right->latch->page_no;
1882           bt_releasemutex(mgr->pagezero->lock);
1883          }
1884
1885         // process lower keys
1886
1887         memcpy (frame, set->page, page_size);
1888         memset (set->page+1, 0, page_size - sizeof(*set->page));
1889
1890         set->page->min = page_size;
1891         set->page->garbage = 0;
1892         set->page->act = 0;
1893         max /= 2;
1894         cnt = 0;
1895         idx = 0;
1896
1897         //  assemble page of smaller keys
1898
1899         while( cnt++ < max ) {
1900                 if( slotptr(frame, cnt)->dead )
1901                         continue;
1902                 val = valptr(frame, cnt);
1903                 set->page->min -= val->len + sizeof(BtVal);
1904                 memcpy ((unsigned char *)set->page + set->page->min, val, val->len + sizeof(BtVal));
1905
1906                 key = keyptr(frame, cnt);
1907                 set->page->min -= key->len + sizeof(BtKey);
1908                 memcpy ((unsigned char *)set->page + set->page->min, key, key->len + sizeof(BtKey));
1909
1910                 //      add librarian slot
1911
1912                 slotptr(set->page, ++idx)->off = set->page->min;
1913                 slotptr(set->page, idx)->type = Librarian;
1914                 slotptr(set->page, idx)->dead = 1;
1915
1916                 //      add actual slot
1917
1918                 slotptr(set->page, ++idx)->off = set->page->min;
1919                 slotptr(set->page, idx)->type = slotptr(frame, cnt)->type;
1920                 set->page->act++;
1921         }
1922
1923         set->page->right = right->latch->page_no;
1924         set->page->fence = set->page->min;
1925         set->page->cnt = idx;
1926         free(frame);
1927
1928         entry = right->latch - mgr->latchsets;
1929         return entry;
1930 }
1931
1932 //      fix keys for newly split page
1933 //      call with both pages pinned & locked
1934 //  return unlocked and unpinned
1935
1936 BTERR bt_splitkeys (BtMgr *mgr, BtPageSet *set, BtLatchSet *right)
1937 {
1938 unsigned char leftkey[BT_keyarray], rightkey[BT_keyarray];
1939 unsigned char value[BtId];
1940 uint lvl = set->page->lvl;
1941 BtPageSet temp[1];
1942 BtPage page;
1943 BtKey *ptr;
1944 uid right2;
1945
1946         // if current page is the root page, split it
1947
1948         if( set->latch->page_no == ROOT_page )
1949                 return bt_splitroot (mgr, set, right);
1950
1951         ptr = fenceptr(set->page);
1952         memcpy (leftkey, ptr, ptr->len + sizeof(BtKey));
1953
1954         page = bt_mappage (mgr, right);
1955
1956         ptr = fenceptr(page);
1957         memcpy (rightkey, ptr, ptr->len + sizeof(BtKey));
1958
1959         //      splice in far right page's left page_no
1960
1961         if( right2 = page->right ) {
1962           if( temp->latch = bt_pinlatch (mgr, right2) )
1963                 temp->page = bt_mappage (mgr, temp->latch);
1964           else
1965                 return 0;
1966
1967       bt_lockpage(BtLockLink, temp->latch, 0, __LINE__);
1968           temp->page->left = right->page_no;
1969           bt_unlockpage(BtLockLink, temp->latch, __LINE__);
1970           bt_unpinlatch (temp->latch);
1971         } else if( !lvl ) {  // right page is far right page
1972           bt_mutexlock (mgr->pagezero->lock);
1973           mgr->pagezero->rightleaf = right->page_no;
1974           bt_releasemutex(mgr->pagezero->lock);
1975         }
1976         // insert new fences in their parent pages
1977
1978         bt_lockpage (BtLockParent, right, 0, __LINE__);
1979
1980         bt_lockpage (BtLockParent, set->latch, 0, __LINE__);
1981         bt_unlockpage (BtLockWrite, set->latch, __LINE__);
1982
1983         // insert new fence for reformulated left block of smaller keys
1984
1985         bt_putid (value, set->latch->page_no);
1986         ptr = (BtKey *)leftkey;
1987
1988         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique) )
1989                 return mgr->err;
1990
1991         // switch fence for right block of larger keys to new right page
1992
1993         bt_putid (value, right->page_no);
1994         ptr = (BtKey *)rightkey;
1995
1996         if( bt_insertkey (mgr, ptr->key, ptr->len, lvl+1, value, BtId, Unique) )
1997                 return mgr->err;
1998
1999         bt_unlockpage (BtLockParent, set->latch, __LINE__);
2000         bt_unpinlatch (set->latch);
2001
2002         bt_unlockpage (BtLockParent, right, __LINE__);
2003         bt_unpinlatch (right);
2004         return 0;
2005 }
2006
2007 //      install new key and value onto page
2008 //      page must already be checked for
2009 //      adequate space
2010
2011 BTERR bt_insertslot (BtMgr *mgr, BtPageSet *set, uint slot, unsigned char *key,uint keylen, unsigned char *value, uint vallen, uint type)
2012 {
2013 uint idx, librarian;
2014 BtSlot *node;
2015 BtKey *ptr;
2016 BtVal *val;
2017 int rate;
2018
2019         //      if previous slot is a librarian slot, use it
2020
2021         if( slot > 1 )
2022           if( slotptr(set->page, slot-1)->type == Librarian )
2023                 slot--;
2024
2025         // copy value onto page
2026
2027         set->page->min -= vallen + sizeof(BtVal);
2028         val = (BtVal*)((unsigned char *)set->page + set->page->min);
2029         memcpy (val->value, value, vallen);
2030         val->len = vallen;
2031
2032         // copy key onto page
2033
2034         set->page->min -= keylen + sizeof(BtKey);
2035         ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2036         memcpy (ptr->key, key, keylen);
2037         ptr->len = keylen;
2038         
2039         //      find first empty slot at or above our insert slot
2040
2041         for( idx = slot; idx < set->page->cnt; idx++ )
2042           if( slotptr(set->page, idx)->dead )
2043                 break;
2044
2045         // now insert key into array before slot.
2046
2047         //      if we're going all the way to the top,
2048         //      add as many librarian slots as
2049         //      makes sense.
2050
2051         if( idx == set->page->cnt ) {
2052         int avail = 4 * set->page->min / 5 - sizeof(*set->page) - ++set->page->cnt * sizeof(BtSlot);
2053
2054                 librarian = ++idx - slot;
2055                 avail /= sizeof(BtSlot);
2056
2057                 if( avail < 0 )
2058                         avail = 0;
2059
2060                 if( librarian > avail )
2061                         librarian = avail;
2062
2063                 if( librarian ) {
2064                         rate = (idx - slot) / librarian;
2065                         set->page->cnt += librarian;
2066                         idx += librarian;
2067                 } else
2068                         rate = 0;
2069         } else
2070                 librarian = 0, rate = 0;
2071
2072         //  transfer slots and add librarian slots
2073
2074         while( idx > slot ) {
2075                 *slotptr(set->page, idx) = *slotptr(set->page, idx-librarian-1);
2076
2077                 //      add librarian slot per rate
2078
2079                 if( librarian )
2080                  if( (idx - slot)/2 <= librarian * rate ) {
2081                         node = slotptr(set->page, --idx);
2082                         node->off = node[1].off;
2083                         node->type = Librarian;
2084                         node->dead = 1;
2085                         librarian--;
2086                   }
2087
2088                 --idx;
2089         }
2090
2091         set->page->act++;
2092
2093         //      fill in new slot
2094
2095         node = slotptr(set->page, slot);
2096         node->off = set->page->min;
2097         node->type = type;
2098         node->dead = 0;
2099         return 0;
2100 }
2101
2102 //  Insert new key into the btree at given level.
2103 //      either add a new key or update/add an existing one
2104
2105 BTERR bt_insertkey (BtMgr *mgr, unsigned char *key, uint keylen, uint lvl, void *value, uint vallen, BtSlotType type)
2106 {
2107 uint slot, idx, len, entry;
2108 BtPageSet set[1];
2109 BtSlot *node;
2110 BtKey *ptr;
2111 BtVal *val;
2112
2113   while( 1 ) { // find the page and slot for the current key
2114         if( slot = bt_loadpage (mgr, set, key, keylen, lvl, BtLockWrite, 0) ) {
2115                 node = slotptr(set->page, slot);
2116                 ptr = keyptr(set->page, slot);
2117         } else
2118                 return mgr->err;
2119
2120         // if librarian slot == found slot, advance to real slot
2121
2122         if( node->type == Librarian ) {
2123                 node = slotptr(set->page, ++slot);
2124                 ptr = keyptr(set->page, slot);
2125         }
2126
2127         //  if inserting a duplicate key or unique
2128         //      key that doesn't exist on the page,
2129         //      check for adequate space on the page
2130         //      and insert the new key before slot.
2131
2132         switch( type ) {
2133         case Unique:
2134         case Duplicate:
2135           if( !keycmp (ptr, key, keylen) )
2136                 break;
2137
2138           if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2139             if( bt_insertslot (mgr, set, slot, key, keylen, value, vallen, type) )
2140                   return mgr->err;
2141                 else
2142                   goto insxit;
2143
2144           if( entry = bt_splitpage (mgr, set, 1) )
2145             if( !bt_splitkeys (mgr, set, entry + mgr->latchsets) )
2146                   continue;
2147
2148           return mgr->err;
2149
2150         case Update:
2151           if( keycmp (ptr, key, keylen) )
2152                 goto insxit;
2153
2154           break;
2155         }
2156
2157         // if key already exists, update value and return
2158
2159         val = valptr(set->page, slot);
2160
2161         if( val->len >= vallen ) {
2162           if( node->dead )
2163                 set->page->act++;
2164           node->type = type;
2165           node->dead = 0;
2166
2167           set->page->garbage += val->len - vallen;
2168           val->len = vallen;
2169           memcpy (val->value, value, vallen);
2170           goto insxit;
2171         }
2172
2173         //  new update value doesn't fit in existing value area
2174         //      make sure page has room
2175
2176         if( !node->dead )
2177           set->page->garbage += val->len + ptr->len + sizeof(BtKey) + sizeof(BtVal);
2178         else
2179           set->page->act++;
2180
2181         node->type = type;
2182         node->dead = 0;
2183
2184         if( slot = bt_cleanpage (mgr, set, keylen, slot, vallen) )
2185           break;
2186
2187         if( entry = bt_splitpage (mgr, set, 1) )
2188           if( !bt_splitkeys (mgr, set, entry + mgr->latchsets) )
2189                 continue;
2190
2191         return mgr->err;
2192   }
2193
2194   //  copy key and value onto page and update slot
2195
2196   set->page->min -= vallen + sizeof(BtVal);
2197   val = (BtVal*)((unsigned char *)set->page + set->page->min);
2198   memcpy (val->value, value, vallen);
2199   val->len = vallen;
2200
2201   set->page->min -= keylen + sizeof(BtKey);
2202   ptr = (BtKey*)((unsigned char *)set->page + set->page->min);
2203   memcpy (ptr->key, key, keylen);
2204   ptr->len = keylen;
2205         
2206   slotptr(set->page,slot)->off = set->page->min;
2207
2208 insxit:
2209   bt_unlockpage(BtLockWrite, set->latch, __LINE__);
2210   bt_unpinlatch (set->latch);
2211   return 0;
2212 }
2213
2214 //      determine actual page where key is located
2215 //  return slot number
2216
2217 uint bt_atomicpage (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx, BtPageSet *set)
2218 {
2219 BtKey *key = keyptr(source,locks[idx].src), *ptr;
2220 uint slot = locks[idx].slot;
2221 uint entry;
2222
2223         if( locks[idx].reuse )
2224           entry = locks[idx-1].entry;
2225         else
2226           entry = locks[idx].entry;
2227
2228         if( slot ) {
2229                 set->latch = mgr->latchsets + entry;
2230                 set->page = bt_mappage (mgr, set->latch);
2231                 return slot;
2232         }
2233
2234         //      find where our key is located 
2235         //      on current page or pages split on
2236         //      same page txn operations.
2237
2238         do {
2239                 set->latch = mgr->latchsets + entry;
2240                 set->page = bt_mappage (mgr, set->latch);
2241
2242                 if( slot = bt_findslot(set->page, key->key, key->len) ) {
2243                   if( slotptr(set->page, slot)->type == Librarian )
2244                         slot++;
2245                   if( locks[idx].reuse )
2246                         locks[idx].entry = entry;
2247                   return slot;
2248                 }
2249         } while( entry = set->latch->split );
2250
2251         mgr->line = __LINE__, mgr->err = BTERR_atomic;
2252         return 0;
2253 }
2254
2255 BTERR bt_atomicinsert (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx)
2256 {
2257 BtKey *key = keyptr(source, locks[idx].src);
2258 BtVal *val = valptr(source, locks[idx].src);
2259 BtLatchSet *latch;
2260 BtPageSet set[1];
2261 uint entry, slot;
2262
2263   while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2264         if( slot = bt_cleanpage(mgr, set, key->len, slot, val->len) ) {
2265           if( bt_insertslot (mgr, set, slot, key->key, key->len, val->value, val->len, slotptr(source,locks[idx].src)->type) )
2266                 return mgr->err;
2267
2268           return 0;
2269         }
2270
2271         //  split page
2272
2273         if( entry = bt_splitpage (mgr, set, 0) )
2274           latch = mgr->latchsets + entry;
2275         else
2276           return mgr->err;
2277
2278         //      splice right page into split chain
2279         //      and WriteLock it
2280
2281         bt_lockpage(BtLockWrite, latch, 0, __LINE__);
2282         latch->split = set->latch->split;
2283         set->latch->split = entry;
2284
2285         // clear slot number for atomic page
2286
2287         locks[idx].slot = 0;
2288   }
2289
2290   return mgr->line = __LINE__, mgr->err = BTERR_atomic;
2291 }
2292
2293 //      perform delete from smaller btree
2294 //  insert a delete slot if not found there
2295
2296 BTERR bt_atomicdelete (BtMgr *mgr, BtPage source, AtomicTxn *locks, uint idx)
2297 {
2298 BtKey *key = keyptr(source, locks[idx].src);
2299 BtLatchSet *latch;
2300 uint slot, entry;
2301 BtPageSet set[1];
2302 BtSlot *node;
2303 BtKey *ptr;
2304 BtVal *val;
2305
2306   while( slot = bt_atomicpage (mgr, source, locks, idx, set) ) {
2307         node = slotptr(set->page, slot);
2308         ptr = keyptr(set->page, slot);
2309         val = valptr(set->page, slot);
2310
2311         //  if slot is not found on cache btree, insert a delete slot
2312         //      otherwise ignore the request.
2313
2314         if( keycmp (ptr, key->key, key->len) )
2315          if( !mgr->type )
2316           if( slot = bt_cleanpage(mgr, set, key->len, slot, 0) )
2317                 return bt_insertslot (mgr, set, slot, key->key, key->len, NULL, 0, Delete);
2318           else { //  split page before inserting Delete slot
2319                 if( entry = bt_splitpage (mgr, set, 0) )
2320                   latch = mgr->latchsets + entry;
2321                 else
2322               return mgr->err;
2323
2324                 //      splice right page into split chain
2325                 //      and WriteLock it
2326
2327                 bt_lockpage(BtLockWrite, latch, 0, __LINE__);
2328                 latch->split = set->latch->split;
2329                 set->latch->split = entry;
2330
2331                 // clear slot number for atomic page
2332
2333                 locks[idx].slot = 0;
2334                 continue;
2335           }
2336          else
2337            return 0;
2338
2339         //      if node is already dead,
2340         //      ignore the request.
2341
2342         if( node->type == Delete || node->dead )
2343                 return 0;
2344
2345         //  if main LSM btree, delete the slot
2346         //      else change to delete type.
2347
2348         if( mgr->type ) {
2349                 set->page->act--;
2350                 node->dead = 1;
2351         } else
2352                 node->type = Delete;
2353
2354         __sync_fetch_and_add(&mgr->found, 1);
2355         return 0;
2356   }
2357
2358   return mgr->line = __LINE__, mgr->err = BTERR_struct;
2359 }
2360
2361 //      release master's splits from right to left
2362
2363 void bt_atomicrelease (BtMgr *mgr, uint entry)
2364 {
2365 BtLatchSet *latch = mgr->latchsets + entry;
2366
2367         if( latch->split )
2368                 bt_atomicrelease (mgr, latch->split);
2369
2370         latch->split = 0;
2371         bt_unlockpage(BtLockWrite, latch, __LINE__);
2372         bt_unpinlatch(latch);
2373 }
2374
2375 int qsortcmp (BtSlot *slot1, BtSlot *slot2, BtPage page)
2376 {
2377 BtKey *key1 = (BtKey *)((char *)page + slot1->off);
2378 BtKey *key2 = (BtKey *)((char *)page + slot2->off);
2379
2380         return keycmp (key1, key2->key, key2->len);
2381 }
2382 //      atomic modification of a batch of keys.
2383
2384 BTERR bt_atomictxn (BtDb *bt, BtPage source)
2385 {
2386 uint src, idx, slot, samepage, entry, que = 0;
2387 BtKey *key, *ptr, *key2;
2388 int result = 0;
2389 BtSlot temp[1];
2390 int type;
2391
2392   // stable sort the list of keys into order to
2393   //    prevent deadlocks between threads.
2394 /*
2395   for( src = 1; src++ < source->cnt; ) {
2396         *temp = *slotptr(source,src);
2397         key = keyptr (source,src);
2398
2399         for( idx = src; --idx; ) {
2400           key2 = keyptr (source,idx);
2401           if( keycmp (key, key2->key, key2->len) < 0 ) {
2402                 *slotptr(source,idx+1) = *slotptr(source,idx);
2403                 *slotptr(source,idx) = *temp;
2404           } else
2405                 break;
2406         }
2407   }
2408 */
2409         qsort_r (slotptr(source,1), source->cnt, sizeof(BtSlot), (__compar_d_fn_t)qsortcmp, source);
2410
2411   //  perform the individual actions in the transaction
2412
2413   if( bt_atomicexec (bt->mgr, source, source->cnt, bt->tid) )
2414         return bt->mgr->err;
2415
2416   // if number of active pages
2417   // is greater than the buffer pool
2418   // promote page into larger btree
2419
2420   if( bt->main )
2421    if( bt->mgr->pagezero->leafpages > bt->mgr->maxleaves )
2422         if( bt_promote (bt) )
2423           return bt->mgr->err;
2424
2425   // return success
2426
2427   return 0;
2428 }
2429
2430 //      execute the source list of inserts/deletes
2431
2432 BTERR bt_atomicexec(BtMgr *mgr, BtPage source, uint count, pid_t tid)
2433 {
2434 uint slot, src, idx, samepage, entry, outidx;
2435 BtPageSet set[1], prev[1];
2436 unsigned char value[BtId];
2437 BtLatchSet *latch;
2438 uid right_page_no;
2439 AtomicTxn *locks;
2440 BtKey *key, *ptr;
2441 BtPage page;
2442 BtVal *val;
2443
2444   locks = calloc (count, sizeof(AtomicTxn));
2445   memset (set, 0, sizeof(BtPageSet));
2446   outidx = 0;
2447
2448   // Load the leaf page for each key
2449   // group same page references with reuse bit
2450
2451   for( src = 0; src++ < count; ) {
2452         if( slotptr(source,src)->dead )
2453           continue;
2454
2455         key = keyptr(source, src);
2456
2457         // first determine if this modification falls
2458         // on the same page as the previous modification
2459         //      note that the far right leaf page is a special case
2460
2461         if( samepage = !!set->page )
2462           samepage = !set->page->right || keycmp (ptr, key->key, key->len) >= 0;
2463
2464         if( !samepage )
2465           if( slot = bt_loadpage(mgr, set, key->key, key->len, 0, BtLockWrite, tid) )
2466                 ptr = fenceptr(set->page), set->latch->split = 0;
2467           else
2468                 return mgr->err;
2469         else
2470           slot = 0;
2471
2472         if( slot )
2473          if( slotptr(set->page, slot)->type == Librarian )
2474           slot++;
2475
2476         entry = set->latch - mgr->latchsets;
2477         locks[outidx].reuse = samepage;
2478         locks[outidx].entry = entry;
2479         locks[outidx].slot = slot;
2480         locks[outidx].src = src;
2481         outidx++;
2482   }
2483
2484   // insert or delete each key
2485   // process any splits or merges
2486   // run through txn list backwards
2487
2488   samepage = outidx;
2489
2490   for( src = outidx; src--; ) {
2491         if( locks[src].reuse )
2492           continue;
2493
2494         //  perform the txn operations
2495         //      from smaller to larger on
2496         //  the same page
2497
2498         for( idx = src; idx < samepage; idx++ )
2499          switch( slotptr(source,locks[idx].src)->type ) {
2500          case Delete:
2501           if( bt_atomicdelete (mgr, source, locks, idx) )
2502                 return mgr->err;
2503           break;
2504
2505         case Duplicate:
2506         case Unique:
2507           if( bt_atomicinsert (mgr, source, locks, idx) )
2508                 return mgr->err;
2509           break;
2510
2511         default:
2512           bt_atomicpage (mgr, source, locks, idx, set);
2513           break;
2514         }
2515
2516         //      after the same page operations have finished,
2517         //  process master page for splits or deletion.
2518
2519         latch = prev->latch = mgr->latchsets + locks[src].entry;
2520         prev->page = bt_mappage (mgr, prev->latch);
2521         samepage = src;
2522
2523         //  pick-up all splits from master page
2524         //      each one is already pinned & WriteLocked.
2525
2526         while( entry = prev->latch->split ) {
2527           set->latch = mgr->latchsets + entry;
2528           set->page = bt_mappage (mgr, set->latch);
2529
2530           // delete empty master page by undoing its split
2531           //  (this is potentially another empty page)
2532           //  note that there are no pointers to it yet
2533
2534           if( !prev->page->act ) {
2535                 set->page->left = prev->page->left;
2536                 memcpy (prev->page, set->page, mgr->page_size << mgr->leaf_xtra);
2537                 bt_lockpage (BtLockDelete, set->latch, 0, __LINE__);
2538                 bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
2539                 prev->latch->split = set->latch->split;
2540                 bt_freepage (mgr, set);
2541                 continue;
2542           }
2543
2544           // remove empty split page from the split chain
2545           // and return it to the free list. No other
2546           // thread has its page number yet.
2547
2548           if( !set->page->act ) {
2549                 prev->page->right = set->page->right;
2550                 prev->latch->split = set->latch->split;
2551
2552                 bt_lockpage (BtLockDelete, set->latch, 0, __LINE__);
2553                 bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
2554                 bt_freepage (mgr, set);
2555                 continue;
2556           }
2557
2558           //  update prev's fence key
2559
2560           ptr = fenceptr(prev->page);
2561           bt_putid (value, prev->latch->page_no);
2562
2563           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Unique) )
2564                 return mgr->err;
2565
2566           // splice in the left link into the split page
2567
2568           set->page->left = prev->latch->page_no;
2569           *prev = *set;
2570         }
2571
2572         //  update left pointer in next right page from last split page
2573         //      (if all splits were reversed or none occurred, latch->split == 0)
2574
2575         if( latch->split ) {
2576           //  fix left pointer in master's original (now split)
2577           //  far right sibling or set rightmost page in page zero
2578
2579           if( right_page_no = prev->page->right ) {
2580                 if( set->latch = bt_pinlatch (mgr, right_page_no) )
2581                   set->page = bt_mappage (mgr, set->latch);
2582                 else
2583                   return mgr->err;
2584
2585             bt_lockpage (BtLockLink, set->latch, 0, __LINE__);
2586             set->page->left = prev->latch->page_no;
2587             bt_unlockpage (BtLockLink, set->latch, __LINE__);
2588                 bt_unpinlatch (set->latch);
2589           } else {      // prev is rightmost page
2590             bt_mutexlock (mgr->pagezero->lock);
2591                 mgr->pagezero->rightleaf = prev->latch->page_no;
2592             bt_releasemutex(mgr->pagezero->lock);
2593           }
2594
2595           //  switch the original fence key from the
2596           //  master page to the last split page.
2597
2598           ptr = fenceptr(prev->page);
2599           bt_putid (value, prev->latch->page_no);
2600
2601           if( bt_insertkey (mgr, ptr->key, ptr->len, 1, value, BtId, Update) )
2602                 return mgr->err;
2603
2604           //  unlock and unpin the split pages
2605
2606           bt_atomicrelease (mgr, latch->split);
2607
2608           //  unlock and unpin the master page
2609
2610           latch->split = 0;
2611           bt_unlockpage(BtLockWrite, latch, __LINE__);
2612           bt_unpinlatch(latch);
2613           continue;
2614         }
2615
2616         //      since there are no splits remaining, we're
2617         //  finished if master page occupied
2618
2619         if( prev->page->act ) {
2620           bt_unlockpage(BtLockWrite, prev->latch, __LINE__);
2621           bt_unpinlatch(prev->latch);
2622           continue;
2623         }
2624
2625         // any and all splits were reversed, and the
2626         // master page located in prev is empty, delete it
2627
2628         if( bt_deletepage (mgr, prev, 0) )
2629                 return mgr->err;
2630   }
2631
2632   // delete the slots
2633
2634   for( idx = 0; idx++ < count; ) {
2635         if( slotptr(source,idx)->dead )
2636           continue;
2637
2638         slotptr(source,idx)->dead = 1;
2639         source->act--;
2640   }
2641
2642   free (locks);
2643   return 0;
2644 }
2645
2646 //  pick & promote a page into the larger btree
2647
2648 BTERR bt_promote (BtDb *bt)
2649 {
2650 BtPageSet set[1];
2651 uint slot, idx;
2652 BtSlot *node;
2653 uid page_no;
2654 BtKey *ptr;
2655 BtVal *val;
2656
2657   bt_mutexlock(bt->mgr->pagezero->promote);
2658
2659   while( 1 ) {
2660         if( bt->mgr->pagezero->leafpromote < bt->mgr->pagezero->allocpage )
2661                 page_no = bt->mgr->pagezero->leafpromote;
2662         else
2663                 page_no = bt->mgr->pagezero->leaf_page;
2664
2665         bt->mgr->pagezero->leafpromote = page_no + (1 << bt->mgr->leaf_xtra);
2666
2667         if( page_no < bt->mgr->pagezero->leaf_page )
2668                 continue;
2669
2670         if( set->latch = bt_pinlatch (bt->mgr, page_no) )
2671                 set->page = bt_mappage (bt->mgr,set->latch);
2672
2673         //      skip upper level pages
2674
2675         if( set->page->lvl ) {
2676           set->latch->pin--;
2677           bt_releasemutex(set->latch->modify);
2678           continue;
2679         }
2680
2681         if( !bt_mutextry(set->latch->modify) ) {
2682           set->latch->pin--;
2683           bt_releasemutex(set->latch->modify);
2684           continue;
2685         }
2686
2687         //  skip this page if it was pinned
2688
2689         if( set->latch->pin > 1 ) {
2690           set->latch->pin--;
2691           bt_releasemutex(set->latch->modify);
2692           continue;
2693         }
2694
2695         // page has no right sibling
2696
2697         if( !set->page->right ) {
2698           set->latch->pin--;
2699           bt_releasemutex(set->latch->modify);
2700           continue;
2701         }
2702
2703         // page is being killed or constructed
2704
2705         if( set->page->nopromote || set->page->kill ) {
2706           set->latch->pin--;
2707           bt_releasemutex(set->latch->modify);
2708           continue;
2709         }
2710
2711         //      leave it locked for the
2712         //      duration of the promotion.
2713
2714         bt_releasemutex(bt->mgr->pagezero->promote);
2715         bt_lockpage (BtLockWrite, set->latch, 0, __LINE__);
2716         bt_releasemutex(set->latch->modify);
2717
2718         // transfer slots in our selected page to the main btree
2719
2720 if( !((page_no>>bt->mgr->leaf_xtra)%100) )
2721 fprintf(stderr, "Promote page %lld, %d keys\n", page_no, set->page->act);
2722
2723         if( bt_atomicexec (bt->main, set->page, set->page->cnt, bt->tid) ) {
2724                 fprintf (stderr, "Promote error = %d line = %d\n", bt->main->err, bt->main->line);
2725                 return bt->main->err;
2726         }
2727
2728         //  now delete the page
2729
2730         if( bt_deletepage (bt->mgr, set, 0) )
2731                 fprintf (stderr, "Promote: delete page err = %d\n", bt->mgr->err);
2732
2733         return bt->mgr->err;
2734   }
2735 }
2736
2737 //      find unique key == given key, or first duplicate key in
2738 //      leaf level and return number of value bytes
2739 //      or (-1) if not found.
2740
2741 int bt_findkey (BtDb *bt, unsigned char *key, uint keylen, unsigned char *value, uint valmax)
2742 {
2743 int ret = -1, type;
2744 BtPageSet set[1];
2745 BtSlot *node;
2746 BtKey *ptr;
2747 BtVal *val;
2748 uint slot;
2749
2750  for( type = 0; type < 2; type++ )
2751   if( slot = bt_loadpage (type ? bt->main : bt->mgr, set, key, keylen, 0, BtLockRead, 0) ) {
2752         node = slotptr(set->page, slot);
2753
2754         //      skip librarian slot place holder
2755
2756         if( node->type == Librarian )
2757           node = slotptr(set->page, ++slot);
2758
2759         ptr = keyptr(set->page, slot);
2760
2761         //      not there if we reach the stopper key
2762         //  or the key doesn't match what's on the page.
2763
2764         if( slot == set->page->cnt )
2765           if( !set->page->right ) {
2766                 bt_unlockpage (BtLockRead, set->latch, __LINE__);
2767                 bt_unpinlatch (set->latch);
2768                 continue;
2769         }
2770
2771         if( keycmp (ptr, key, keylen) ) {
2772                 bt_unlockpage (BtLockRead, set->latch, __LINE__);
2773                 bt_unpinlatch (set->latch);
2774                 continue;
2775         }
2776
2777         // key matches, return >= 0 value bytes copied
2778         //      or -1 if not there.
2779
2780         if( node->type == Delete || node->dead ) {
2781                 ret = -1;
2782                 goto findxit;
2783         }
2784
2785         val = valptr (set->page,slot);
2786
2787         if( valmax > val->len )
2788                 valmax = val->len;
2789
2790         memcpy (value, val->value, valmax);
2791         ret = valmax;
2792         goto findxit;
2793   }
2794
2795   ret = -1;
2796
2797 findxit:
2798   if( type < 2 ) {
2799         bt_unlockpage (BtLockRead, set->latch, __LINE__);
2800         bt_unpinlatch (set->latch);
2801   }
2802   return ret;
2803 }
2804
2805 //      set cursor to highest slot on right-most page
2806
2807 BTERR bt_lastkey (BtDb *bt)
2808 {
2809 uid cache_page_no = bt->mgr->pagezero->rightleaf;
2810 uid main_page_no = bt->main->pagezero->rightleaf;
2811
2812         if( bt->cacheset->latch = bt_pinlatch (bt->mgr, cache_page_no) )
2813                 bt->cacheset->page = bt_mappage (bt->mgr, bt->cacheset->latch);
2814         else
2815                 return bt->mgr->err;
2816
2817     bt_lockpage(BtLockRead, bt->cacheset->latch, 0, __LINE__);
2818         bt->cacheslot = bt->cacheset->page->cnt;
2819
2820         if( bt->mainset->latch = bt_pinlatch (bt->main, main_page_no) )
2821                 bt->mainset->page = bt_mappage (bt->main, bt->mainset->latch);
2822         else
2823                 return bt->main->err;
2824
2825     bt_lockpage(BtLockRead, bt->mainset->latch, 0, __LINE__);
2826         bt->mainslot = bt->mainset->page->cnt;
2827         bt->phase = 2;
2828         return 0;
2829 }
2830
2831 //      return previous slot on cursor page
2832
2833 uint bt_prevslot (BtMgr *mgr, BtPageSet *set, uint slot)
2834 {
2835 uid next, us = set->latch->page_no;
2836
2837   while( 1 ) {
2838         while( --slot )
2839           if( slotptr(set->page, slot)->dead )
2840                 continue;
2841           else
2842                 return slot;
2843
2844         next = set->page->left;
2845
2846         if( !next )
2847                 return 0;
2848
2849         do {
2850           bt_unlockpage(BtLockRead, set->latch, __LINE__);
2851           bt_unpinlatch (set->latch);
2852
2853           if( set->latch = bt_pinlatch (mgr, next) )
2854             set->page = bt_mappage (mgr, set->latch);
2855           else
2856             return 0;
2857
2858       bt_lockpage(BtLockRead, set->latch, 0, __LINE__);
2859           next = set->page->right;
2860
2861         } while( next != us );
2862
2863     slot = set->page->cnt + 1;
2864   }
2865 }
2866
2867 //  advance to previous key
2868
2869 BTERR bt_prevkey (BtDb *bt)
2870 {
2871 int cmp;
2872
2873   // first advance last key(s) one previous slot
2874
2875   while( 1 ) {
2876         switch( bt->phase ) {
2877         case 0:
2878           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot);
2879           break;
2880         case 1:
2881           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot);
2882           break;
2883         case 2:
2884           bt->cacheslot = bt_prevslot (bt->mgr, bt->cacheset, bt->cacheslot);
2885           bt->mainslot = bt_prevslot (bt->main, bt->mainset, bt->mainslot);
2886         break;
2887         }
2888
2889   // return next key
2890
2891         if( bt->cacheslot ) {
2892           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
2893           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
2894           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
2895         }
2896
2897         if( bt->mainslot ) {
2898           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
2899           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
2900           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
2901         }
2902
2903         if( bt->mainslot && bt->cacheslot )
2904           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
2905         else if( bt->cacheslot )
2906           cmp = 1;
2907         else if( bt->mainslot )
2908           cmp = -1;
2909         else
2910           return 0;
2911
2912         //  cache key is larger
2913
2914         if( cmp > 0 ) {
2915           bt->phase = 0;
2916           if( bt->cachenode->type == Delete )
2917                 continue;
2918           return bt->cacheslot;
2919         }
2920
2921         //  main key is larger
2922
2923         if( cmp < 0 ) {
2924           bt->phase = 1;
2925           return bt->mainslot;
2926         }
2927
2928         //      keys are equal
2929
2930         bt->phase = 2;
2931
2932         if( bt->cachenode->type == Delete )
2933                 continue;
2934
2935         return bt->cacheslot;
2936   }
2937 }
2938
2939 //      advance to next slot in cache or main btree
2940 //      return 0 for EOF/error
2941
2942 uint bt_nextslot (BtMgr *mgr, BtPageSet *set, uint slot)
2943 {
2944 BtPage page;
2945 uid page_no;
2946
2947   while( 1 ) {
2948         while( slot++ < set->page->cnt )
2949           if( slotptr(set->page, slot)->dead )
2950                 continue;
2951           else if( slot < set->page->cnt || set->page->right )
2952                 return slot;
2953           else
2954                 return 0;
2955
2956         bt_unlockpage(BtLockRead, set->latch, __LINE__);
2957         bt_unpinlatch (set->latch);
2958
2959         if( page_no = set->page->right )
2960           if( set->latch = bt_pinlatch (mgr, page_no) )
2961                 set->page = bt_mappage (mgr, set->latch);
2962           else
2963                 return 0;
2964         else
2965           return 0; // EOF
2966
2967         // obtain access lock using lock chaining with Access mode
2968
2969         bt_lockpage(BtLockAccess, set->latch, 0, __LINE__);
2970         bt_lockpage(BtLockRead, set->latch, 0, __LINE__);
2971         bt_unlockpage(BtLockAccess, set->latch, __LINE__);
2972         slot = 0;
2973   }
2974 }
2975
2976 //  advance to next key
2977
2978 BTERR bt_nextkey (BtDb *bt)
2979 {
2980 int cmp;
2981
2982   // first advance last key(s) one next slot
2983
2984   while( 1 ) {
2985         switch( bt->phase ) {
2986         case 0:
2987           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot);
2988           break;
2989         case 1:
2990           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot);
2991           break;
2992         case 2:
2993           bt->cacheslot = bt_nextslot (bt->mgr, bt->cacheset, bt->cacheslot);
2994           bt->mainslot = bt_nextslot (bt->main, bt->mainset, bt->mainslot);
2995         break;
2996         }
2997
2998   // return next key
2999
3000         if( bt->cacheslot ) {
3001           bt->cachenode = slotptr(bt->cacheset->page, bt->cacheslot);
3002           bt->cachekey = keyptr(bt->cacheset->page, bt->cacheslot);
3003           bt->cacheval = valptr(bt->cacheset->page, bt->cacheslot);
3004         }
3005
3006         if( bt->mainslot ) {
3007           bt->mainnode = slotptr(bt->mainset->page, bt->mainslot);
3008           bt->mainkey = keyptr(bt->mainset->page, bt->mainslot);
3009           bt->mainval = valptr(bt->mainset->page, bt->mainslot);
3010         }
3011
3012         if( bt->mainslot && bt->cacheslot )
3013           cmp = keycmp (bt->cachekey, bt->mainkey->key, bt->mainkey->len);
3014         else if( bt->mainslot )
3015           cmp = 1;
3016         else if( bt->cacheslot )
3017           cmp = -1;
3018         else
3019           return 0;
3020
3021         //  main key is larger
3022         //      return smaller key
3023
3024         if( cmp < 0 ) {
3025           bt->phase = 0;
3026           if( bt->cachenode->type == Delete )
3027                 continue;
3028           return bt->cacheslot;
3029         }
3030
3031         //  cache key is larger
3032
3033         if( cmp > 0 ) {
3034           bt->phase = 1;
3035           return bt->mainslot;
3036         }
3037
3038         //      keys are equal
3039
3040         bt->phase = 2;
3041
3042         if( bt->cachenode->type == Delete )
3043                 continue;
3044
3045         return bt->cacheslot;
3046   }
3047 }
3048
3049 //  start sweep of keys
3050
3051 BTERR bt_startkey (BtDb *bt, unsigned char *key, uint len)
3052 {
3053 BtPageSet set[1];
3054 uint slot;
3055
3056         // cache btree page
3057
3058         if( slot = bt_loadpage (bt->mgr, bt->cacheset, key, len, 0, BtLockRead, 0) )
3059                 bt->cacheslot = slot - 1;
3060         else
3061           return bt->mgr->err;
3062
3063         // main btree page
3064
3065         if( slot = bt_loadpage (bt->main, bt->mainset, key, len, 0, BtLockRead, 0) )
3066                 bt->mainslot = slot - 1;
3067         else
3068           return bt->mgr->err;
3069
3070         bt->phase = 2;
3071         return 0;
3072 }
3073
3074 //      flush cache pages to main btree
3075
3076 BTERR bt_flushmain (BtDb *bt)
3077 {
3078 uint count, cnt = 0;
3079 BtPageSet set[1];
3080
3081   while( bt->mgr->pagezero->leafpages > 0 ) {
3082         if( set->latch = bt_pinlatch (bt->mgr, bt->mgr->pagezero->leaf_page) )
3083           set->page = bt_mappage (bt->mgr, set->latch);
3084         else
3085           return bt->mgr->err;
3086
3087         bt_lockpage(BtLockWrite, set->latch, 0, __LINE__);
3088         count = set->page->cnt;
3089
3090         if( !set->page->right )
3091                 count--;
3092
3093 if( !(cnt++ % 100) )
3094 fprintf(stderr, "Promote LEAF_page %d with %d keys\n", cnt, set->page->act);
3095
3096         if( bt_atomicexec (bt->main, set->page, count, bt->tid) )
3097           return bt->mgr->line = bt->main->line, bt->mgr->err = bt->main->err;
3098
3099         if( set->page->right )
3100           if( bt_deletepage (bt->mgr, set, 0) )
3101                 return bt->mgr->err;
3102           else
3103                 continue;
3104
3105         bt_unlockpage(BtLockWrite, set->latch, __LINE__);
3106         bt_unpinlatch (set->latch);
3107         return 0;
3108   }
3109
3110   //  leaf page count is off
3111
3112   bt->mgr->line = __LINE__;
3113   return bt->mgr->err = BTERR_ovflw;
3114 }
3115
3116 #ifdef STANDALONE
3117
3118 #ifndef unix
3119 double getCpuTime(int type)
3120 {
3121 FILETIME crtime[1];
3122 FILETIME xittime[1];
3123 FILETIME systime[1];
3124 FILETIME usrtime[1];
3125 SYSTEMTIME timeconv[1];
3126 double ans = 0;
3127
3128         memset (timeconv, 0, sizeof(SYSTEMTIME));
3129
3130         switch( type ) {
3131         case 0:
3132                 GetSystemTimeAsFileTime (xittime);
3133                 FileTimeToSystemTime (xittime, timeconv);
3134                 ans = (double)timeconv->wDayOfWeek * 3600 * 24;
3135                 break;
3136         case 1:
3137                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3138                 FileTimeToSystemTime (usrtime, timeconv);
3139                 break;
3140         case 2:
3141                 GetProcessTimes (GetCurrentProcess(), crtime, xittime, systime, usrtime);
3142                 FileTimeToSystemTime (systime, timeconv);
3143                 break;
3144         }
3145
3146         ans += (double)timeconv->wHour * 3600;
3147         ans += (double)timeconv->wMinute * 60;
3148         ans += (double)timeconv->wSecond;
3149         ans += (double)timeconv->wMilliseconds / 1000;
3150         return ans;
3151 }
3152 #else
3153 #include <time.h>
3154 #include <sys/resource.h>
3155
3156 double getCpuTime(int type)
3157 {
3158 struct rusage used[1];
3159 struct timeval tv[1];
3160
3161         switch( type ) {
3162         case 0:
3163                 gettimeofday(tv, NULL);
3164                 return (double)tv->tv_sec + (double)tv->tv_usec / 1000000;
3165
3166         case 1:
3167                 getrusage(RUSAGE_SELF, used);
3168                 return (double)used->ru_utime.tv_sec + (double)used->ru_utime.tv_usec / 1000000;
3169
3170         case 2:
3171                 getrusage(RUSAGE_SELF, used);
3172                 return (double)used->ru_stime.tv_sec + (double)used->ru_stime.tv_usec / 1000000;
3173         }
3174
3175         return 0;
3176 }
3177 #endif
3178
3179 void bt_poolaudit (BtMgr *mgr, char *type)
3180 {
3181 BtLatchSet *latch, test[1];
3182 uint entry;
3183
3184         memset (test, 0, sizeof(test));
3185
3186         if( memcmp (test, mgr->latchsets, sizeof(test)) )
3187                 fprintf(stderr, "%s latchset zero overwritten\n", type);
3188
3189         for( entry = 0; ++entry < mgr->pagezero->latchtotal; ) {
3190                 latch = mgr->latchsets + entry;
3191
3192                 if( *latch->modify->value )
3193                         fprintf(stderr, "%s latchset %d modifylocked for page %lld\n", type, entry, latch->page_no);
3194
3195                 if( latch->pin )
3196                         fprintf(stderr, "%s latchset %d pinned %d times for page %lld\n", type, entry, latch->pin, latch->page_no);
3197         }
3198 }
3199
3200 typedef struct {
3201         char idx;
3202         char *type;
3203         char *infile;
3204         BtMgr *main;
3205         BtMgr *mgr;
3206         int num;
3207 } ThreadArg;
3208
3209 //  standalone program to index file of keys
3210 //  then list them onto std-out
3211
3212 #ifdef unix
3213 void *index_file (void *arg)
3214 #else
3215 uint __stdcall index_file (void *arg)
3216 #endif
3217 {
3218 int line = 0, found = 0, cnt = 0, cachecnt, idx;
3219 int ch, len = 0, slot, type = 0;
3220 unsigned char key[BT_maxkey];
3221 unsigned char buff[65536];
3222 uint nxt = sizeof(buff);
3223 ThreadArg *args = arg;
3224 uint counts[8][2];
3225 BtPageSet set[1];
3226 BtPage page;
3227 uid page_no;
3228 int vallen;
3229 BtKey *ptr;
3230 BtVal *val;
3231 uint size;
3232 BtDb *bt;
3233 FILE *in;
3234
3235         bt = bt_open (args->mgr, args->main);
3236         page = (BtPage)buff;
3237
3238         if( args->idx < strlen (args->type) )
3239                 ch = args->type[args->idx];
3240         else
3241                 ch = args->type[strlen(args->type) - 1];
3242
3243         switch(ch | 0x20)
3244         {
3245         case 'm':
3246           fprintf(stderr, "started flushing cache to main btree\n");
3247
3248           if( bt->main )
3249                 if( bt_flushmain(bt) )
3250                   fprintf(stderr, "Error %d Line: %d\n", bt->mgr->err, bt->mgr->line), exit(0);
3251
3252           break;
3253
3254         case 'd':
3255                 type = Delete;
3256
3257         case 'p':
3258                 if( !type )
3259                         type = Unique;
3260
3261                 if( args->num )
3262                  if( type == Delete )
3263                   fprintf(stderr, "started TXN pennysort delete for %s\n", args->infile);
3264                  else
3265                   fprintf(stderr, "started TXN pennysort insert for %s\n", args->infile);
3266                 else
3267                  if( type == Delete )
3268                   fprintf(stderr, "started pennysort delete for %s\n", args->infile);
3269                  else
3270                   fprintf(stderr, "started pennysort insert for %s\n", args->infile);
3271
3272                 if( in = fopen (args->infile, "rb") )
3273                   while( ch = getc(in), ch != EOF )
3274                         if( ch == '\n' )
3275                         {
3276                           line++;
3277
3278                           if( !args->num ) {
3279                             if( bt_insertkey (bt->mgr, key, 10, 0, key + 10, len - 10, Unique) )
3280                                   fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3281                             len = 0;
3282                                 continue;
3283                           }
3284
3285                           nxt -= len - 10;
3286                           memcpy (buff + nxt, key + 10, len - 10);
3287                           nxt -= 1;
3288                           buff[nxt] = len - 10;
3289                           nxt -= 10;
3290                           memcpy (buff + nxt, key, 10);
3291                           nxt -= 1;
3292                           buff[nxt] = 10;
3293                           slotptr(page,++cnt)->off  = nxt;
3294                           slotptr(page,cnt)->type = type;
3295                           slotptr(page,cnt)->dead = 0;
3296                           len = 0;
3297
3298                           if( cnt < args->num )
3299                                 continue;
3300
3301                           page->cnt = cnt;
3302                           page->act = cnt;
3303                           page->min = nxt;
3304
3305                           if( bt_atomictxn (bt, page) )
3306                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3307                           nxt = sizeof(buff);
3308                           cnt = 0;
3309
3310                         }
3311                         else if( len < BT_maxkey )
3312                                 key[len++] = ch;
3313                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3314                 break;
3315
3316         case 'w':
3317                 fprintf(stderr, "started indexing for %s\n", args->infile);
3318                 if( in = fopen (args->infile, "r") )
3319                   while( ch = getc(in), ch != EOF )
3320                         if( ch == '\n' )
3321                         {
3322                           line++;
3323
3324                           if( bt_insertkey (bt->mgr, key, len, 0, NULL, 0, Unique) )
3325                                 fprintf(stderr, "Error %d Line: %d source: %d\n", bt->mgr->err, bt->mgr->line, line), exit(0);
3326                           len = 0;
3327                         }
3328                         else if( len < BT_maxkey )
3329                                 key[len++] = ch;
3330                 fprintf(stderr, "finished %s for %d keys\n", args->infile, line);
3331                 break;
3332
3333         case 'f':
3334                 fprintf(stderr, "started finding keys for %s\n", args->infile);
3335                 if( in = fopen (args->infile, "rb") )
3336                   while( ch = getc(in), ch != EOF )
3337                         if( ch == '\n' )
3338                         {
3339                           line++;
3340                           if( bt_findkey (bt, key, len, NULL, 0) == 0 )
3341                                 found++;
3342                           else if( bt->mgr->err )
3343                                 fprintf(stderr, "Error %d Syserr %d Line: %d source: %d\n", bt->mgr->err, errno, bt->mgr->line, line), exit(0);
3344                           len = 0;
3345                         }
3346                         else if( len < BT_maxkey )
3347                                 key[len++] = ch;
3348                 fprintf(stderr, "finished %s for %d keys, found %d\n", args->infile, line, found);
3349                 break;
3350
3351         case 's':
3352                 fprintf(stderr, "started forward scan\n");
3353                 if( bt_startkey (bt, NULL, 0) )
3354                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3355
3356                 while( bt_nextkey (bt) ) {
3357                   if( bt->phase == 1 ) {
3358                         len = bt->mainkey->len;
3359
3360                         if( bt->mainnode->type == Duplicate )
3361                                 len -= BtId;
3362
3363                         fwrite (bt->mainkey->key, len, 1, stdout);
3364                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3365                   } else {
3366                         len = bt->cachekey->len;
3367
3368                         if( bt->cachenode->type == Duplicate )
3369                                 len -= BtId;
3370
3371                         fwrite (bt->cachekey->key, len, 1, stdout);
3372                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3373                   }
3374
3375                   fputc ('\n', stdout);
3376                   cnt++;
3377             }
3378
3379                 bt_unlockpage(BtLockRead, bt->cacheset->latch, __LINE__);
3380                 bt_unpinlatch (bt->cacheset->latch);
3381
3382                 bt_unlockpage(BtLockRead, bt->mainset->latch, __LINE__);
3383                 bt_unpinlatch (bt->mainset->latch);
3384
3385                 fprintf(stderr, " Total keys read %d\n", cnt);
3386                 break;
3387
3388         case 'r':
3389                 fprintf(stderr, "started reverse scan\n");
3390                 if( bt_lastkey (bt) )
3391                         fprintf(stderr, "unable to begin scan error %d Line: %d\n", bt->mgr->err, bt->mgr->line);
3392
3393                 while( bt_prevkey (bt) ) {
3394                   if( bt->phase == 1 ) {
3395                         len = bt->mainkey->len;
3396
3397                         if( bt->mainnode->type == Duplicate )
3398                                 len -= BtId;
3399
3400                         fwrite (bt->mainkey->key, len, 1, stdout);
3401                         fwrite (bt->mainval->value, bt->mainval->len, 1, stdout);
3402                   } else {
3403                         len = bt->cachekey->len;
3404
3405                         if( bt->cachenode->type == Duplicate )
3406                                 len -= BtId;
3407
3408                         fwrite (bt->cachekey->key, len, 1, stdout);
3409                         fwrite (bt->cacheval->value, bt->cacheval->len, 1, stdout);
3410                   }
3411
3412                   fputc ('\n', stdout);
3413                   cnt++;
3414             }
3415
3416                 bt_unlockpage(BtLockRead, bt->cacheset->latch, __LINE__);
3417                 bt_unpinlatch (bt->cacheset->latch);
3418
3419                 bt_unlockpage(BtLockRead, bt->mainset->latch, __LINE__);
3420                 bt_unpinlatch (bt->mainset->latch);
3421
3422                 fprintf(stderr, " Total keys read %d\n", cnt);
3423                 break;
3424
3425         case 'c':
3426                 fprintf(stderr, "started counting LSM cache btree\n");
3427                 memset (counts, 0, sizeof(counts));
3428                 page_no = bt->mgr->pagezero->leaf_page;
3429
3430                 size = bt->mgr->page_size << bt->mgr->leaf_xtra;
3431                 page = malloc(size);
3432
3433 #ifdef unix
3434                 posix_fadvise( bt->mgr->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3435 #endif
3436                 while( page_no < bt->mgr->pagezero->allocpage ) {
3437                   if( bt_readpage (bt->mgr, page, page_no, 0) )
3438                         fprintf(stderr, "Unable to read page %lld from cache\n", page_no), exit(1);
3439                   if( !page->lvl && !page->free ) {
3440                     cnt += page->act;
3441
3442                         for( idx = 0; idx++ < page->cnt; ) {
3443                         BtSlot *node = slotptr (page, idx);
3444                           counts[node->type][node->dead]++;
3445                         }
3446                   }
3447                   page_no += 1 << bt->mgr->leaf_xtra;
3448                 }
3449                 
3450                 cachecnt = --cnt;       // remove stopper key
3451                 counts[Unique][0]--;
3452
3453                 fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
3454                 fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3455                 fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3456                 fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
3457                 fprintf(stderr, "total cache keys count: %d\n", cachecnt);
3458                 free (page);
3459
3460                 fprintf(stderr, "started counting LSM main btree\n");
3461                 memset (counts, 0, sizeof(counts));
3462                 size = bt->main->page_size << bt->main->leaf_xtra;
3463                 page_no = bt->mgr->pagezero->leaf_page;
3464                 page = malloc(size);
3465                 cnt = 0;
3466
3467 #ifdef unix
3468                 posix_fadvise( bt->main->idx, 0, 0, POSIX_FADV_SEQUENTIAL);
3469 #endif
3470                 while( page_no < bt->main->pagezero->allocpage ) {
3471                   if( bt_readpage (bt->main, page, page_no, 0) )
3472                         fprintf(stderr, "Unable to read page %lld from main\n", page_no), exit(1);
3473                   if( !page->lvl && !page->free ) {
3474                     cnt += page->act;
3475
3476                         for( idx = 0; idx++ < page->cnt; ) {
3477                         BtSlot *node = slotptr (page, idx);
3478                           counts[node->type][node->dead]++;
3479                         }
3480                   }
3481                   page_no += 1 << bt->main->leaf_xtra;
3482                 }
3483                 
3484                 cnt--;  // remove stopper key
3485                 counts[Unique][0]--;
3486
3487                 fprintf(stderr, "  Unique    : %d  dead: %d\n", counts[Unique][0], counts[Unique][1]);
3488                 fprintf(stderr, "  Duplicates: %d  dead: %d\n", counts[Duplicate][0], counts[Duplicate][1]);
3489                 fprintf(stderr, "  Librarian : %d  dead: %d\n", counts[Librarian][0], counts[Librarian][1]);
3490                 fprintf(stderr, "  Deletion  : %d  dead: %d\n", counts[Delete][0], counts[Delete][1]);
3491                 fprintf(stderr, "total main keys count : %d\n", cnt);
3492                 fprintf(stderr, "Total keys counted    : %d\n", cnt + cachecnt);
3493                 free (page);
3494                 break;
3495         }
3496
3497         bt_close (bt);
3498 #ifdef unix
3499         return NULL;
3500 #else
3501         return 0;
3502 #endif
3503 }
3504
3505 typedef struct timeval timer;
3506
3507 int main (int argc, char **argv)
3508 {
3509 int idx, cnt, len, slot, err;
3510 double start, stop;
3511 #ifdef unix
3512 pthread_t *threads;
3513 #else
3514 HANDLE *threads;
3515 #endif
3516 ThreadArg *args;
3517 uint mainleafpool = 0;
3518 uint mainleafxtra = 0;
3519 uint maxleaves = 0;
3520 uint poolsize = 0;
3521 uint leafpool = 0;
3522 uint leafxtra = 0;
3523 uint mainpool = 0;
3524 uint mainbits = 0;
3525 int bits = 16;
3526 float elapsed;
3527 int num = 0;
3528 char key[1];
3529 BtMgr *main;
3530 BtMgr *mgr;
3531 BtKey *ptr;
3532
3533         if( argc < 3 ) {
3534                 fprintf (stderr, "Usage: %s idx_file main_file cmds [pagebits leafbits poolsize txnsize mainbits mainleafbits mainpool maxleaves src_file1 src_file2 ... ]\n", argv[0]);
3535                 fprintf (stderr, "  where idx_file is the name of the cache btree file\n");
3536                 fprintf (stderr, "  where main_file is the name of the main btree file\n");
3537                 fprintf (stderr, "  cmds is a string of (r)ev scan/(w)rite/(s)can/(d)elete/(f)ind/(p)ennysort/(c)ount/(m)ainflush, with a one character command for each input src_file. A command can also be given with no input file\n");
3538                 fprintf (stderr, "  pagebits is the page size in bits for the cache btree\n");
3539                 fprintf (stderr, "  leafbits is the number of xtra bits for a leaf page\n");
3540                 fprintf (stderr, "  poolsize is the number of latches in latch pool for the cache btree\n");
3541                 fprintf (stderr, "  txnsize = n to block transactions into n units, or zero for no transactions\n");
3542                 fprintf (stderr, "  mainbits is the page size of the main btree in bits\n");
3543                 fprintf (stderr, "  mainpool is the number of latches in the main latch pool\n");
3544                 fprintf (stderr, "  maxleaves is the threashold for LSM leaf page promotion\n");
3545                 fprintf (stderr, "  src_file1 thru src_filen are files of keys separated by newline\n");
3546                 exit(0);
3547         }
3548
3549         start = getCpuTime(0);
3550
3551         if( argc > 4 )
3552                 bits = atoi(argv[4]);
3553
3554         if( argc > 5 )
3555                 leafxtra = atoi(argv[5]);
3556
3557         if( argc > 6 )
3558                 poolsize = atoi(argv[6]);
3559
3560         if( argc > 7 )
3561                 num = atoi(argv[7]);
3562
3563         if( argc > 8 )
3564                 mainbits = atoi(argv[8]);
3565
3566         if( argc > 9 )
3567                 mainleafxtra = atoi(argv[9]);
3568
3569         if( argc > 10 )
3570                 mainpool = atoi(argv[10]);
3571
3572         if( argc > 11 )
3573                 maxleaves = atoi(argv[11]);
3574
3575         if( argc > 12 )
3576                 cnt = argc - 12;
3577         else
3578                 cnt = 0;
3579
3580 #ifdef unix
3581         threads = malloc (cnt * sizeof(pthread_t));
3582 #else
3583         threads = GlobalAlloc (GMEM_FIXED|GMEM_ZEROINIT, cnt * sizeof(HANDLE));
3584 #endif
3585         args = malloc ((cnt + 1) * sizeof(ThreadArg));
3586
3587         mgr = bt_mgr (argv[1], bits, leafxtra, poolsize);
3588
3589         if( !mgr ) {
3590                 fprintf(stderr, "Index Open Error %s\n", argv[1]);
3591                 exit (1);
3592         } else {
3593                 mgr->maxleaves = maxleaves;
3594                 mgr->type = 0;
3595         }
3596
3597         main = bt_mgr (argv[2], mainbits, mainleafxtra, mainpool);
3598
3599         if( !main ) {
3600                 fprintf(stderr, "Index Open Error %s\n", argv[2]);
3601                 exit (1);
3602         } else
3603                 main->type = 1;
3604
3605         //      fire off threads
3606
3607         if( cnt > 0 )
3608           for( idx = 0; idx < cnt; idx++ ) {
3609                 args[idx].infile = argv[idx + 12];
3610                 args[idx].type = argv[3];
3611                 args[idx].main = main;
3612                 args[idx].mgr = mgr;
3613                 args[idx].num = num;
3614                 args[idx].idx = idx;
3615 #ifdef unix
3616                 if( err = pthread_create (threads + idx, NULL, index_file, args + idx) )
3617                         fprintf(stderr, "Error creating thread %d\n", err);
3618 #else
3619                 threads[idx] = (HANDLE)_beginthreadex(NULL, 131072, index_file, args + idx, 0, NULL);
3620 #endif
3621           }
3622         else {
3623                 args[0].type = argv[3];
3624                 args[0].main = main;
3625                 args[0].mgr = mgr;
3626                 args[0].num = num;
3627                 args[0].idx = 0;
3628                 index_file (args);
3629         }
3630
3631         //      wait for termination
3632
3633 #ifdef unix
3634         for( idx = 0; idx < cnt; idx++ )
3635                 pthread_join (threads[idx], NULL);
3636 #else
3637         WaitForMultipleObjects (cnt, threads, TRUE, INFINITE);
3638
3639         for( idx = 0; idx < cnt; idx++ )
3640                 CloseHandle(threads[idx]);
3641 #endif
3642         bt_poolaudit(mgr, "cache");
3643
3644         if( main )
3645                 bt_poolaudit(main, "main");
3646
3647         fprintf(stderr, "cache %lld leaves %lld upper %d found\n", mgr->pagezero->leafpages, mgr->pagezero->upperpages, mgr->found);
3648         if( main )
3649                 fprintf(stderr, "main %lld leaves %lld upper %d found\n", main->pagezero->leafpages, main->pagezero->upperpages, main->found);
3650
3651         bt_mgrclose (mgr);
3652
3653         if( main )
3654                 bt_mgrclose (main);
3655
3656         elapsed = getCpuTime(0) - start;
3657         fprintf(stderr, " real %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3658         elapsed = getCpuTime(1);
3659         fprintf(stderr, " user %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3660         elapsed = getCpuTime(2);
3661         fprintf(stderr, " sys  %dm%.3fs\n", (int)(elapsed/60), elapsed - (int)(elapsed/60)*60);
3662 }
3663
3664 BtKey *bt_fence (BtPage page)
3665 {
3666 return fenceptr(page);
3667 }
3668
3669 BtKey *bt_key (BtPage page, uint slot)
3670 {
3671 return keyptr(page,slot);
3672 }
3673
3674 BtSlot *bt_slot (BtPage page, uint slot)
3675 {
3676 return slotptr(page,slot);
3677 }
3678 #endif  //STANDALONE